基于 Go 1.4,相關(guān)文件位于 src/runtime 目錄。文章忽略了 32bit 代碼,有興趣的可自行查看源碼文件。為便于閱讀,示例代碼做過裁剪。
Go 內(nèi)存分配器基于 tcmalloc 模型,這在 malloc.h 頭部注釋中有明確說明。
Memory allocator, based on tcmalloc.
http://goog-perftools.sourceforge.net/doc/tcmalloc.html
核心目標(biāo)很簡(jiǎn)單:
分配器以頁為單位向操作系統(tǒng)申請(qǐng)大塊內(nèi)存。這些大塊內(nèi)存由 n 個(gè)地址連續(xù)的頁組成,并用名為 span 的對(duì)象進(jìn)行管理。
malloc.h
PageShift" = 13,
PageSize" = 1<<PageShift, // 8192 bytes
當(dāng)需要時(shí),span 所管理內(nèi)存被切分成多個(gè)大小相等的小塊,每個(gè)小塊可存儲(chǔ)一個(gè)對(duì)象,故稱作 object。
分配器以 32KB 為界,將對(duì)象分為大小兩種。
malloc.h
MaxSmallSize = 32<<10,
大對(duì)象直接找一個(gè)大小合適的 span,這個(gè)無需多言。小對(duì)象則以 8 的倍數(shù)分為不同大小等級(jí) (size class)。比如 class1 為 8 字節(jié),可存儲(chǔ) 1 ~ 8 字節(jié)大小的對(duì)象。
NumSizeClasses = 67,
當(dāng)然,實(shí)際的對(duì)應(yīng)規(guī)則并不是連續(xù)和固定的,會(huì)根據(jù)一些經(jīng)驗(yàn)和測(cè)試結(jié)果進(jìn)行調(diào)整,以獲得最佳的性能和內(nèi)存利用率。
malloc.h
// Size classes. Computed and initialized by InitSizes.
//
// SizeToClass(0 <= n <= MaxSmallSize) returns the size class,
//" 1 <= sizeclass < NumSizeClasses, for n.
//" Size class 0 is reserved to mean "not small".
//
// class_to_size[i] = largest size in class i
// class_to_allocnpages[i] = number of pages to allocate when
//" making new objects in class i
int32" runtime·SizeToClass(int32);
extern"int32" runtime·class_to_size[NumSizeClasses];
extern"int32" runtime·class_to_allocnpages[NumSizeClasses];
extern"int8" runtime·size_to_class8[1024/8 + 1];
extern"int8" runtime·size_to_class128[(MaxSmallSize-1024)/128 + 1];
為了管理好內(nèi)存,分配器使用三級(jí)組件來完成不同操作。
簡(jiǎn)單描述一下內(nèi)存分配和回收流程。
分配流程:
回收流程:
從 heap 申請(qǐng)和回收 span 的過程中,分配器會(huì)嘗試合并地址相鄰的 span 塊,以形成更大內(nèi)存塊,減少碎片。
分配器管理算法依賴連續(xù)內(nèi)存地址。因此,在初始化時(shí),分配器會(huì)預(yù)留一塊巨大的虛擬地址空間。該空間被成三個(gè)部分:
在 64 位系統(tǒng)下,arena 最大容量是 128GB,bitmap 8GB,spans 128MB。這些內(nèi)存并非一次性分配,而是隨著 arena 線性增加,每個(gè)區(qū)域都有指針標(biāo)記當(dāng)前分配位置。
malloc.h
struct MHeap
{
// span lookup
MSpan** spans;
uintptr spans_mapped;
// range of addresses we might see in the heap
byte *bitmap;
uintptr bitmap_mapped;
byte *arena_start;
byte *arena_used;
byte *arena_end;
bool arena_reserved;
};
虛擬地址預(yù)留操作并非物理內(nèi)存分配,因此看到 “Hello, World” 消耗上百 GB “內(nèi)存”,無需大驚小怪。
在運(yùn)行時(shí)初始化時(shí),會(huì)調(diào)用內(nèi)存分配器初始化函數(shù)。
proc.c
void runtime·schedinit(void)
{
runtime·mallocinit();
}
malloc.c
void runtime·mallocinit(void)
{
// 初始化 size class 反查表。
runtime·InitSizes();
// 64-bit
if(sizeof(void*) == 8 && (limit == 0 || limit > (1<<30))) {
arena_size = MaxMem; // 128GB
bitmap_size = arena_size / (sizeof(void*)*8/4); // 8GB
spans_size = arena_size / PageSize * sizeof(runtime·mheap.spans[0]);
spans_size = ROUND(spans_size, PageSize); // 128MB
// 嘗試從 0xc000000000 開始設(shè)置保留地址。
// 如果失敗,則嘗試 0x1c000000000 ~ 0x7fc000000000。
for(i = 0; i <= 0x7f; i++) {
p = (void*)(i<<40 | 0x00c0ULL<<32);
p_size = bitmap_size + spans_size + arena_size + PageSize;
p = runtime·SysReserve(p, p_size, &reserved);
if(p != nil)
break;
}
}
// 32-bit
if (p == nil) {
// 忽略
}
// 按 PageSize 對(duì)齊地址。
// 分配器使用 Address<<PageShift 作為 PageID。
p1 = (byte*)ROUND((uintptr)p, PageSize);
// 設(shè)定不同區(qū)域的起始地址。
runtime·mheap.spans = (MSpan**)p1;
runtime·mheap.bitmap = p1 + spans_size;
runtime·mheap.arena_start = p1 + spans_size + bitmap_size;
runtime·mheap.arena_used = runtime·mheap.arena_start;
runtime·mheap.arena_end = p + p_size;
runtime·mheap.arena_reserved = reserved;
// 初始化 heap 和當(dāng)前 cache。
runtime·MHeap_Init(&runtime·mheap);
g->m->mcache = runtime·allocmcache();
}
內(nèi)存地址預(yù)留操作通過 mmap PORT_NONE 實(shí)現(xiàn)。不過,在 darwin/OSX 中,并未使用 MAP_FIXED 參數(shù),因此未必從 0xc000000000 開始。
mem_darwin.c
void* runtime·SysReserve(void *v, uintptr n, bool *reserved)
{
void *p;
*reserved = true;
p = runtime·mmap(v, n, PROT_NONE, MAP_ANON|MAP_PRIVATE, -1, 0);
if(p < (void*)4096)
return nil;
return p;
}
分配器根對(duì)象 heap 的初始化工作,主要是幾個(gè) span 管理鏈表和 central 數(shù)組的創(chuàng)建。
malloc.h
MaxMHeapList = 1<<(20 - PageShift), // Maximum page length for fixed-size list in MHeap.
struct MHeap
{
MSpan free[MaxMHeapList]; // free lists of given length
MSpan busy[MaxMHeapList]; // busy lists of large objects of given length
MSpan freelarge; // free lists length >= MaxMHeapList
MSpan busylarge; // busy lists of large objects length >= MaxMHeapList
struct MHeapCentral {
MCentral mcentral;
byte pad[CacheLineSize];
} central[NumSizeClasses];
};
其中,free 和 busy 數(shù)組以 span 頁數(shù)為序號(hào)管理多個(gè)鏈表。當(dāng) central 有需要時(shí),只需從 free 找到頁數(shù)合適的鏈表,從中提取可用 span 即可。busy 記錄的自然是已經(jīng)被使用的 span。
至于 large 鏈表,用于保存所有超出 free/busy 頁數(shù)限制的 span。
mheap.c
void runtime·MHeap_Init(MHeap *h)
{
uint32 i;
// 初始化一些管理類型的固定分配器。
runtime·FixAlloc_Init(&h->spanalloc, sizeof(MSpan), RecordSpan, ...);
runtime·FixAlloc_Init(&h->cachealloc, sizeof(MCache), ...);
runtime·FixAlloc_Init(&h->specialfinalizeralloc, sizeof(SpecialFinalizer), ...);
runtime·FixAlloc_Init(&h->specialprofilealloc, sizeof(SpecialProfile), ...);
// 初始化 free/busy 數(shù)組。
for(i=0; i<nelem(h->free); i++) {
runtime·MSpanList_Init(&h->free[i]);
runtime·MSpanList_Init(&h->busy[i]);
}
// 初始化 large 鏈表。
runtime·MSpanList_Init(&h->freelarge);
runtime·MSpanList_Init(&h->busylarge);
// 創(chuàng)建所有等級(jí)的 central 對(duì)象。
for(i=0; i<nelem(h->central); i++)
runtime·MCentral_Init(&h->central[i].mcentral, i);
}
像 span、cache 這類管理對(duì)象,并不從 arena 區(qū)域分配,而是使用專門的 FixAlloc 分配器單獨(dú)管理。其具體實(shí)現(xiàn)細(xì)節(jié)可參考后續(xù)章節(jié)。
在 span 內(nèi)部有兩個(gè)指針,用于將多個(gè)對(duì)象串成雙向鏈表。
malloc.h
struct MSpan
{
MSpan *next; // in a span linked list
MSpan *prev; // in a span linked list
pageID start; // starting page number
uintptr npages; // number of pages in span
MLink *freelist; // list of free objects
uint8 sizeclass; // size class
uint8 state; // MSpanInUse etc
uintptr elemsize; // computed from sizeclass or from npages
};
mheap.c
void runtime·MSpanList_Init(MSpan *list)
{
list->state = MSpanListHead;
list->next = list;
list->prev = list;
}
至于 central,同樣是完成兩個(gè) span 管理鏈表的初始化操作。其中 nonempty 鏈表保存有剩余 object 空間,等待被 cache 獲取的 span。而 empty 則保存沒有剩余空間或已被 cache 獲取的 span。
malloc.h
struct MCentral
{
int32 sizeclass;
MSpan nonempty; // list of spans with a free object
MSpan empty; // list of spans with no free objects (or cached in an MCache)
};
mcentral.c
void runtime·MCentral_Init(MCentral *c, int32 sizeclass)
{
c->sizeclass = sizeclass;
runtime·MSpanList_Init(&c->nonempty);
runtime·MSpanList_Init(&c->empty);
}
最后,用固定分配器創(chuàng)建 cache 對(duì)象,并初始化其 alloc 數(shù)組。
malloc.h
struct MCache
{
MSpan* alloc[NumSizeClasses]; // spans to allocate from
};
mcache.c
// dummy MSpan that contains no free objects.
MSpan runtime·emptymspan;
MCache* runtime·allocmcache(void)
{
// 使用固定分配器創(chuàng)建 cache 對(duì)象。
c = runtime·FixAlloc_Alloc(&runtime·mheap.cachealloc);
// 初始化內(nèi)存。
runtime·memclr((byte*)c, sizeof(*c));
// 初始化 alloc 數(shù)組,用來保存從 central 獲取的不同等級(jí) span 對(duì)象。
for(i = 0; i < NumSizeClasses; i++)
c->alloc[i] = &runtime·emptymspan;
return c;
}
相關(guān)包裝函數(shù),最終通過 mallocgc 函數(shù)完成內(nèi)存分配操作。
malloc.go
func newobject(typ *_type) unsafe.Pointer {
return mallocgc(uintptr(typ.size), typ, flags)
}
func newarray(typ *_type, n uintptr) unsafe.Pointer {
return mallocgc(uintptr(typ.size)*n, typ, flags)
}
在分配過程中,需要判斷大小對(duì)象,還有對(duì)小于 16 字節(jié)的微小對(duì)象做額外處理。
malloc.h
MaxSmallSize = 32<<10,
TinySize = 16,
TinySizeClass = 2,
malloc.go
func mallocgc(size uintptr, typ *_type, flags uint32) unsafe.Pointer {
// 當(dāng)前 cache 對(duì)象。
c := gomcache()
var s *mspan
var x unsafe.Pointer
// 判斷是否小對(duì)象。
if size <= maxSmallSize {
// 對(duì)于小于 16 字節(jié)的微小對(duì)象,做額外處理。
if flags&flagNoScan != 0 && size < maxTinySize {
// 獲取當(dāng)前 cache tiny 塊剩余大小。
tinysize := uintptr(c.tinysize)
// 如果 tiny 塊空間足夠...
if size <= tinysize {
tiny := unsafe.Pointer(c.tiny)
// 地址對(duì)齊。
if size&7 == 0 {
tiny = roundup(tiny, 8)
} else if size&3 == 0 {
tiny = roundup(tiny, 4)
} else if size&1 == 0 {
tiny = roundup(tiny, 2)
}
// 實(shí)際大小 = 對(duì)象大小 + 對(duì)齊所需大小(對(duì)齊后地址 - 原地址)。
size1 := size + (uintptr(tiny) - uintptr(unsafe.Pointer(c.tiny)))
// 再次判斷空間是否足夠...
if size1 <= tinysize {
// x = 對(duì)齊后地址
x = tiny
// 調(diào)整剩余空間記錄。
c.tiny = (*byte)(add(x, size))
c.tinysize -= uintptr(size1)
c.local_tinyallocs++
return x
}
}
// 如果 tiny 塊空間不足,則從 alloc[2] 獲取新的 tiny/object 塊。
s = c.alloc[tinySizeClass]
v := s.freelist
// 如果該 span 沒有可用 object ...
if v == nil {
// 從 central 獲取新的 span。
mp := acquirem()
mp.scalararg[0] = tinySizeClass
onM(mcacheRefill_m)
releasem(mp)
// 獲取 tiny/object 塊。
s = c.alloc[tinySizeClass]
v = s.freelist
}
// 提取 tiny 塊后,調(diào)整 span.freelist 鏈表。
s.freelist = v.next
s.ref++
// 初始化 tiny 塊內(nèi)存。
x = unsafe.Pointer(v)
(*[2]uint64)(x)[0] = 0
(*[2]uint64)(x)[1] = 0
// 如果新 tiny 塊剩余空間大于原 tiny 塊,那么就換一下。
if maxTinySize-size > tinysize {
// 調(diào)整剩余位置指針和大小。
c.tiny = (*byte)(add(x, size))
c.tinysize = uintptr(maxTinySize - size)
}
size = maxTinySize
} else { // 普通小對(duì)象
var sizeclass int8
// 計(jì)算對(duì)應(yīng)的等級(jí)。
if size <= 1024-8 {
sizeclass = size_to_class8[(size+7)>>3]
} else {
sizeclass = size_to_class128[(size-1024+127)>>7]
}
size = uintptr(class_to_size[sizeclass])
// 從 alloc 數(shù)組獲取對(duì)應(yīng)的 span。
s = c.alloc[sizeclass]
// 從 span 鏈表提取 object。
v := s.freelist
// 如果 span 沒有剩余 object,則從 central 獲取新的 span。
if v == nil {
mp := acquirem()
mp.scalararg[0] = uintptr(sizeclass)
onM(mcacheRefill_m)
releasem(mp)
s = c.alloc[sizeclass]
v = s.freelist
}
// 調(diào)整 span 鏈表。
s.freelist = v.next
s.ref++
// 初始化內(nèi)存。
x = unsafe.Pointer(v)
if flags&flagNoZero == 0 {
v.next = nil
if size > 2*ptrSize && ((*[2]uintptr)(x))[1] != 0 {
memclr(unsafe.Pointer(v), size)
}
}
}
c.local_cachealloc += intptr(size)
} else { // 大對(duì)象
mp := acquirem()
mp.scalararg[0] = uintptr(size)
mp.scalararg[1] = uintptr(flags)
// 直接從 heap 分配一個(gè)適用的 span。
// onM 是切換到 M.g0 棧執(zhí)行函數(shù),相關(guān)細(xì)節(jié)參考后續(xù)章節(jié)。
onM(largeAlloc_m)
s = (*mspan)(mp.ptrarg[0])
mp.ptrarg[0] = nil
releasem(mp)
x = unsafe.Pointer(uintptr(s.start << pageShift))
size = uintptr(s.elemsize)
}
// 在 bitmap 做標(biāo)記。
{
arena_start := uintptr(unsafe.Pointer(mheap_.arena_start))
off := (uintptr(x) - arena_start) / ptrSize
xbits := (*uint8)(unsafe.Pointer(arena_start - off/wordsPerBitmapByte - 1))
shift := (off % wordsPerBitmapByte) * gcBits
// ...
}
marked:
// 檢查分配計(jì)數(shù)器,以決定是否觸發(fā)垃圾回收操作。
if memstats.heap_alloc >= memstats.next_gc {
gogc(0)
}
return x
}
函數(shù)雖然有點(diǎn)長(zhǎng),但不算太復(fù)雜。
malloc.h
struct MCache
{
// Allocator cache for tiny objects w/o pointers.
byte* tiny;
uintptr tinysize;
MSpan* alloc[NumSizeClasses]; // spans to allocate from
};
除基本的分配操作外,還需要關(guān)注內(nèi)存不足時(shí)的 “擴(kuò)張” 過程。這需要一點(diǎn)耐心和細(xì)心。
首先,當(dāng) cache.alloc[] 中對(duì)應(yīng)的 span 沒有剩余 object 時(shí),會(huì)觸發(fā)從 central 獲取新span 操作。
malloc.c
void runtime·mcacheRefill_m(void)
{
runtime·MCache_Refill(g->m->mcache, (int32)g->m->scalararg[0]);
}
mcache.c
MSpan* runtime·MCache_Refill(MCache *c, int32 sizeclass)
{
MSpan *s;
// 當(dāng)前沒有剩余空間的 span。
s = c->alloc[sizeclass];
if(s->freelist != nil)
runtime·throw("refill on a nonempty span");
// 取消 incache 標(biāo)記。
if(s != &runtime·emptymspan)
s->incache = false;
// 從 heap.central[] 數(shù)組找到對(duì)應(yīng)的 central,并獲取新的 span。
s = runtime·MCentral_CacheSpan(&runtime·mheap.central[sizeclass].mcentral);
// 保存到 cache.alloc 數(shù)組。
c->alloc[sizeclass] = s;
return s;
}
從 central 新獲取的 span 會(huì)替代原有對(duì)象,被保存到 alloc 數(shù)組中。
需要提前說明一點(diǎn)背景知識(shí):從 Go 1.3 開始,垃圾回收算法就有很大變動(dòng)。其中標(biāo)記階段需要執(zhí)行 StopTheWorld,然后用多線程并發(fā)執(zhí)行標(biāo)記操作。待標(biāo)記結(jié)束后,立即恢復(fù)StartTheWorld,用單獨(dú)的 goroutine 執(zhí)行清理操作。
因此在執(zhí)行 CacheSpan 時(shí),某些 span 可能還未完成清理。此時(shí)主動(dòng)觸發(fā)回收操作,有助于提高內(nèi)存復(fù)用率,避免向操作系統(tǒng)過度申請(qǐng)內(nèi)存。
malloc.h
sweep generation:
if sweepgen == h->sweepgen - 2, the span needs sweeping
if sweepgen == h->sweepgen - 1, the span is currently being swept
if sweepgen == h->sweepgen, the span is swept and ready to use
h->sweepgen is incremented by 2 after every GC
mcentral.c
MSpan* runtime·MCentral_CacheSpan(MCentral *c)
{
// 當(dāng)前垃圾回收代齡 (隨每次回收操作遞增)。
sg = runtime·mheap.sweepgen;
retry:
// 嘗試從 nonempty 鏈表中獲取可用 span。
for(s = c->nonempty.next; s != &c->nonempty; s = s->next) {
// 如果 span 標(biāo)記為等待回收,那么主動(dòng)執(zhí)行清理操作。
if(s->sweepgen == sg-2 && runtime·cas(&s->sweepgen, sg-2, sg-1)) {
// 將 span 移動(dòng)到鏈表尾部。
runtime·MSpanList_Remove(s);
runtime·MSpanList_InsertBack(&c->empty, s);
// 執(zhí)行垃圾清理。
runtime·MSpan_Sweep(s, true);
goto havespan;
}
// 如果正在后臺(tái)回收,則跳過。
if(s->sweepgen == sg-1) {
// the span is being swept by background sweeper, skip
continue;
}
// 可用 span,將其轉(zhuǎn)移到 empty 鏈表。
runtime·MSpanList_Remove(s);
runtime·MSpanList_InsertBack(&c->empty, s);
goto havespan;
}
// 嘗試從 emtpy 鏈表獲取 span,目標(biāo)是那些等待清理的 span。
for(s = c->empty.next; s != &c->empty; s = s->next) {
// 如果是等待回收的 span,主動(dòng)執(zhí)行回收操作。
if(s->sweepgen == sg-2 && runtime·cas(&s->sweepgen, sg-2, sg-1)) {
// 將該 span 移到 empty 鏈表尾部。
runtime·MSpanList_Remove(s);
runtime·MSpanList_InsertBack(&c->empty, s);
// 執(zhí)行垃圾清理操作。
runtime·MSpan_Sweep(s, true);
// 如果回收后 freelist 鏈表不為空,表示有可用空間。
if(s->freelist != nil)
goto havespan;
goto retry;
}
// 如果正在后臺(tái)回收,跳過。
if(s->sweepgen == sg-1) {
continue;
}
// 處理過的 span,其代齡都已經(jīng)標(biāo)記為 sg,終止嘗試。
break;
}
// 如果 central 中沒有找到可用 span,則向 heap 獲取新的 span。
s = MCentral_Grow(c);
if(s == nil)
return nil;
// 將 span 插入到 empty 鏈表。
runtime·MSpanList_InsertBack(&c->empty, s);
havespan:
// 設(shè)置待返回 span 的相關(guān)屬性。
cap = (s->npages << PageShift) / s->elemsize;
n = cap - s->ref;
// 標(biāo)記被 cache 使用。
s->incache = true;
return s;
}
相比 Go 1.3,cache 部分又做了很大的改進(jìn)。代碼更加簡(jiǎn)潔,流程也更加清晰。
而當(dāng) central 空間不足時(shí),就需要從 heap 獲取新 span 來完成擴(kuò)張操作。這其中就包括對(duì) span 所管理內(nèi)存進(jìn)行切分,形成 object freelist 鏈表。
mcentral.c
static MSpan* MCentral_Grow(MCentral *c)
{
MLink **tailp, *v;
byte *p;
MSpan *s;
// 計(jì)算所需 span 的大小信息。
npages = runtime·class_to_allocnpages[c->sizeclass];
size = runtime·class_to_size[c->sizeclass];
n = (npages << PageShift) / size;
// 從 heap 獲取 span。
s = runtime·MHeap_Alloc(&runtime·mheap, npages, c->sizeclass, 0, 1);
if(s == nil)
return nil;
// 將 span 所管理的內(nèi)存切分成 freelist/object 鏈表。
tailp = &s->freelist;
p = (byte*)(s->start << PageShift); // 起始地址。PageID(start) = p >> PageShift
s->limit = p + size*n;
for(i=0; i<n; i++) {
v = (MLink*)p;
*tailp = v;
tailp = &v->next;
p += size;
}
*tailp = nil;
// 標(biāo)記。
runtime·markspan((byte*)(s->start<<PageShift), size, n, ...));
return s;
}
前面在 mallocgc 中提及的大對(duì)象分配,也是用的 MHeap_Alloc 函數(shù)。
malloc.c
void runtime·largeAlloc_m(void)
{
size = g->m->scalararg[0];
npages = size >> PageShift;
s = runtime·MHeap_Alloc(&runtime·mheap, npages, 0, 1, !(flag & FlagNoZero));
g->m->ptrarg[0] = s;
}
mheap.c
MSpan* runtime·MHeap_Alloc(MHeap *h, uintptr npage, int32 sizeclass, bool large, ...)
{
// 判斷是否在 g0 棧執(zhí)行。
if(g == g->m->g0) {
s = mheap_alloc(h, npage, sizeclass, large);
} else {
...
}
return s;
}
static MSpan* mheap_alloc(MHeap *h, uintptr npage, int32 sizeclass, bool large)
{
MSpan *s;
// 如果垃圾回收操作未結(jié)束,那么嘗試主動(dòng)收回一些空間,以避免內(nèi)存過度增長(zhǎng)。
// we need to sweep and reclaim at least n pages.
if(!h->sweepdone)
MHeap_Reclaim(h, npage);
// 返回可用 span。
s = MHeap_AllocSpanLocked(h, npage);
if(s != nil) {
// 標(biāo)記代齡等狀態(tài)。
runtime·atomicstore(&s->sweepgen, h->sweepgen);
s->state = MSpanInUse;
s->freelist = nil;
s->ref = 0;
s->sizeclass = sizeclass;
s->elemsize = (sizeclass==0
s->npages<<PageShift : runtime·class_to_size[sizeclass]);
// 如果是大對(duì)象...
if(large) {
mstats.heap_objects++;
mstats.heap_alloc += npage<<PageShift;
// 根據(jù)頁數(shù),插入到合適的 busy 鏈表。
if(s->npages < nelem(h->free))
runtime·MSpanList_InsertBack(&h->busy[s->npages], s);
else
runtime·MSpanList_InsertBack(&h->busylarge, s);
}
}
return s;
}
從 heap 獲取 span 算法:
mheap.c
static MSpan* MHeap_AllocSpanLocked(MHeap *h, uintptr npage)
{
uintptr n;
MSpan *s, *t;
pageID p;
// 以頁數(shù)為序號(hào),從 heap.free[] 中查找鏈表。
// 如果當(dāng)前鏈表沒有可用 span,則從頁數(shù)更大的鏈表中提取。
for(n=npage; n < nelem(h->free); n++) {
if(!runtime·MSpanList_IsEmpty(&h->free[n])) {
s = h->free[n].next;
goto HaveSpan;
}
}
// 如果 free 所有鏈表都沒找到合適的 span,則嘗試更大的 large 鏈表。
if((s = MHeap_AllocLarge(h, npage)) == nil) {
// 還沒找到,就只能新申請(qǐng)內(nèi)存了。
if(!MHeap_Grow(h, npage))
return nil;
// 重新查找合適的 span。
// 每次向操作系統(tǒng)申請(qǐng)內(nèi)存最少 1MB/128Pages,而 heap.free 最大下標(biāo) 127,
// 因此 FreeSpanLocked 函數(shù)會(huì)將其放到 freelarge 鏈表中。
if((s = MHeap_AllocLarge(h, npage)) == nil)
return nil;
}
HaveSpan:
// 將找到的 span 從 free 鏈表中移除。
runtime·MSpanList_Remove(s);
// 如果該 span 曾釋放過物理內(nèi)存,那么重新映射。
if(s->npreleased > 0) {
runtime·SysUsed((void*)(s->start<<PageShift), s->npages<<PageShift);
mstats.heap_released -= s->npreleased<<PageShift;
s->npreleased = 0;
}
// 如果返回的 span 頁數(shù)多于需要 ...
if(s->npages > npage) {
// 新建一個(gè) span 對(duì)象 t,用來管理尾部多余內(nèi)存空間。
t = runtime·FixAlloc_Alloc(&h->spanalloc);
runtime·MSpan_Init(t, s->start + npage, s->npages - npage);
// 調(diào)整實(shí)際所需的內(nèi)存大小。
s->npages = npage;
p = t->start;
p -= ((uintptr)h->arena_start>>PageShift);
// 在 spans 區(qū)域標(biāo)記 span 指針。
if(p > 0)
h->spans[p-1] = s;
h->spans[p] = t;
h->spans[p+t->npages-1] = t;
// 將切出來的多余 span,重新放回 heap 管理鏈表中。
MHeap_FreeSpanLocked(h, t, false, false);
s->state = MSpanFree;
}
// 在 spans 中標(biāo)記待所有頁對(duì)應(yīng)指針。
p = s->start;
p -= ((uintptr)h->arena_start>>PageShift);
for(n=0; n<npage; n++)
h->spans[p+n] = s;
return s;
}
當(dāng)找到的 span 大小超出預(yù)期時(shí),分配器會(huì)執(zhí)行切割操作,將多余的內(nèi)存做成新 span 放回 heap 管理鏈表中。
從 large 里查找 span 的算法被稱作 BestFit。很簡(jiǎn)單,通過循環(huán)遍歷,找到大小最合適的目標(biāo)。
mheap.c
MHeap_AllocLarge(MHeap *h, uintptr npage)
{
return BestFit(&h->freelarge, npage, nil);
}
static MSpan* BestFit(MSpan *list, uintptr npage, MSpan *best)
{
MSpan *s;
for(s=list->next; s != list; s=s->next) {
if(s->npages < npage)
continue;
if(best == nil
|| s->npages < best->npages
|| (s->npages == best->npages && s->start < best->start))
best = s;
}
return best;
}
接著看看將 span 放回 heap 管理鏈表的 FreeSpanLocked 操作。
mheap.c
static void MHeap_FreeSpanLocked(MHeap *h, MSpan *s, bool acctinuse, bool acctidle)
{
MSpan *t;
pageID p;
// 修正狀態(tài)標(biāo)記。
s->state = MSpanFree;
// 從當(dāng)前鏈表中移除。
runtime·MSpanList_Remove(s);
// 這兩個(gè)參數(shù)會(huì)影響垃圾回收的物理內(nèi)存釋放操作。
s->unusedsince = runtime·nanotime();
s->npreleased = 0;
// 實(shí)際地址。
p = s->start;
p -= (uintptr)h->arena_start >> PageShift;
// 通過 heap.spans 檢查左側(cè)相鄰 span。
// 如果左側(cè)相鄰 span 也是空閑狀態(tài),則合并。
if(p > 0 && (t = h->spans[p-1]) != nil && t->state != MSpanInUse &&
t->state != MSpanStack) {
// 修正屬性。
s->start = t->start;
s->npages += t->npages;
s->npreleased = t->npreleased; // absorb released pages
s->needzero |= t->needzero;
// 新起始地址。
p -= t->npages;
// 重新標(biāo)記 spans。
h->spans[p] = s;
// 釋放左側(cè) span 原對(duì)象。
runtime·MSpanList_Remove(t);
t->state = MSpanDead;
runtime·FixAlloc_Free(&h->spanalloc, t);
}
// 嘗試合并右側(cè) span。
if((p+s->npages)*sizeof(h->spans[0]) < h->spans_mapped &&
(t = h->spans[p+s->npages]) != nil &&
t->state != MSpanInUse && t->state != MSpanStack) {
s->npages += t->npages;
s->npreleased += t->npreleased;
s->needzero |= t->needzero;
h->spans[p + s->npages - 1] = s;
runtime·MSpanList_Remove(t);
t->state = MSpanDead;
runtime·FixAlloc_Free(&h->spanalloc, t);
}
// 根據(jù) span 頁數(shù),插入到合適的鏈表中。
if(s->npages < nelem(h->free))
runtime·MSpanList_Insert(&h->free[s->npages], s);
else
runtime·MSpanList_Insert(&h->freelarge, s);
}
在此,我們看到了 heap.spans 的作用。合并零散內(nèi)存塊,以提供更大復(fù)用空間,這有助于減少內(nèi)存碎片,是內(nèi)存管理算法的一個(gè)重要設(shè)計(jì)目標(biāo)。
最后,就是剩下如何向操作系統(tǒng)申請(qǐng)新的內(nèi)存了。
malloc.h
HeapAllocChunk = 1<<20," " // Chunk size for heap growth
mheap.c
static bool MHeap_Grow(MHeap *h, uintptr npage)
{
// 每次申請(qǐng)的內(nèi)存總是 64KB 的倍數(shù),最小 1MB。
npage = ROUND(npage, (64<<10)/PageSize);
ask = npage<<PageShift;
if(ask < HeapAllocChunk)
ask = HeapAllocChunk;
// 申請(qǐng)內(nèi)存。
v = runtime·MHeap_SysAlloc(h, ask);
// 創(chuàng)建新的 span 對(duì)象進(jìn)行管理。
s = runtime·FixAlloc_Alloc(&h->spanalloc);
runtime·MSpan_Init(s, (uintptr)v>>PageShift, ask>>PageShift);
p = s->start;
p -= ((uintptr)h->arena_start>>PageShift);
// 在 heap.spans 中標(biāo)記地址。
h->spans[p] = s;
h->spans[p + s->npages - 1] = s;
// 設(shè)置狀態(tài)。
runtime·atomicstore(&s->sweepgen, h->sweepgen);
s->state = MSpanInUse;
// 放回 heap 的管理鏈表,嘗試執(zhí)行合并操作。
MHeap_FreeSpanLocked(h, s, false, true);
return true;
}
申請(qǐng)時(shí),需判斷目標(biāo)地址是否在 arena 范圍內(nèi),且必須從 arena_used 開始。
malloc.c
void* runtime·MHeap_SysAlloc(MHeap *h, uintptr n)
{
// 在 arena 范圍內(nèi)。
if(n <= h->arena_end - h->arena_used) {
// 使用 arena_used 地址。
p = h->arena_used;
runtime·SysMap(p, n, h->arena_reserved, &mstats.heap_sys);
// 調(diào)整下一次分配位置。
h->arena_used += n;
// 同步增加 spans、bitmap 管理內(nèi)存。
runtime·MHeap_MapBits(h);
runtime·MHeap_MapSpans(h);
return p;
}
...
}
mem_linux.c
void runtime·SysMap(void *v, uintptr n, bool reserved, uint64 *stat)
{
p = runtime·mmap(v, n, PROT_READ|PROT_WRITE, MAP_ANON|MAP_FIXED|MAP_PRIVATE, -1, 0);
}
mem_darwin.c
void runtime·SysMap(void *v, uintptr n, bool reserved, uint64 *stat)
{
p = runtime·mmap(v, n, PROT_READ|PROT_WRITE, MAP_ANON|MAP_FIXED|MAP_PRIVATE, -1, 0);
}
至此,對(duì)象內(nèi)存分配和內(nèi)存擴(kuò)展的步驟結(jié)束。
垃圾回收器通過調(diào)用 MSpan_Sweep 函數(shù)完成內(nèi)存回收操作。
mgc0.c
bool runtime·MSpan_Sweep(MSpan *s, bool preserve)
{
// 當(dāng)前垃圾回收代齡。
sweepgen = runtime·mheap.sweepgen;
arena_start = runtime·mheap.arena_start;
// 獲取 span 相關(guān)信息。
cl = s->sizeclass;
size = s->elemsize;
if(cl == 0) {
// 大對(duì)象。
n = 1;
} else {
// 小對(duì)象。
npages = runtime·class_to_allocnpages[cl];
n = (npages << PageShift) / size;
}
res = false;
nfree = 0;
end = &head;
c = g->m->mcache;
sweepgenset = false;
// 標(biāo)記 freelist 里的 object,這些對(duì)象未被使用,無需再次檢查。
for(link = s->freelist; link != nil; link = link->next) {
off = (uintptr*)link - (uintptr*)arena_start;
bitp = arena_start - off/wordsPerBitmapByte - 1;
shift = (off % wordsPerBitmapByte) * gcBits;
*bitp |= bitMarked<<shift;
}
// 釋放 finalizer、profiler 關(guān)聯(lián)對(duì)象。
specialp = &s->specials;
special = *specialp;
while(special != nil) {
// ...
}
// 計(jì)算標(biāo)記位開始位置。
p = (byte*)(s->start << PageShift);
off = (uintptr*)p - (uintptr*)arena_start;
bitp = arena_start - off/wordsPerBitmapByte - 1;
shift = 0;
step = size/(PtrSize*wordsPerBitmapByte);
bitp += step;
if(step == 0) {
// 8-byte objects.
bitp++;
shift = gcBits;
}
// 遍歷該 span 所有 object。
for(; n > 0; n--, p += size) {
// 獲取標(biāo)記位。
bitp -= step;
if(step == 0) {
if(shift != 0)
bitp--;
shift = gcBits - shift;
}
xbits = *bitp;
bits = (xbits>>shift) & bitMask;
// 如果 object 對(duì)象標(biāo)記為可達(dá) (Marked),則跳過。
// 包括 freelist 里的未使用對(duì)象。
if((bits&bitMarked) != 0) {
*bitp &= ~(bitMarked<<shift);
continue;
}
// 重置標(biāo)記位。
*bitp = (xbits & ~((bitMarked|(BitsMask<<2))<<shift)) |
((uintptr)BitsDead<<(shift+2));
if(cl == 0) { // 大對(duì)象。
// 清除全部標(biāo)記位。
runtime·unmarkspan(p, s->npages<<PageShift);
// 重置代齡。
runtime·atomicstore(&s->sweepgen, sweepgen);
sweepgenset = true;
if(runtime·debug.efence) {
// ...
} else
// 將大對(duì)象所使用的 span 歸還給 heap。
runtime·MHeap_Free(&runtime·mheap, s, 1);
// 調(diào)整 next_gc 閾值。
runtime·xadd64(&mstats.next_gc,
-(uint64)(size * (runtime·gcpercent + 100)/100));
res = true;
} else { // 小對(duì)象。
// 將可回收對(duì)象添加到一個(gè)鏈表中。
end->next = (MLink*)p;
end = (MLink*)p;
nfree++;
}
}
// 如可回收小對(duì)象數(shù)量大于0。
if(nfree > 0) {
// 調(diào)整 next_gc 閾值。
runtime·xadd64(&mstats.next_gc,
-(uint64)(nfree * size * (runtime·gcpercent + 100)/100));
// 釋放收集的 object 鏈表。
res = runtime·MCentral_FreeSpan(&runtime·mheap.central[cl].mcentral, s, nfree,head.next, end, preserve);
}
return res;
}
該回收函數(shù)在分配流程 CacheSpan 中也曾提及過。
大對(duì)象釋放很簡(jiǎn)單,調(diào)用 FreeSpanLocked 將 span 重新放回 heap 管理鏈表即可。
mheap.c
void runtime·MHeap_Free(MHeap *h, MSpan *s, int32 acct)
{
mheap_free(h, s, acct);
}
static void mheap_free(MHeap *h, MSpan *s, int32 acct)
{
MHeap_FreeSpanLocked(h, s, true, true);
}
至于收集的所有小對(duì)象,會(huì)被追加到 span.freelist 鏈表。如該 span 收回全部 object,則也將其歸還給 heap。
mcentral.c
bool runtime·MCentral_FreeSpan(MCentral *c, MSpan *s, int32 n, MLink *start, ...)
{
// span 不能是 cache 正在使用的對(duì)象。
if(s->incache)
runtime·throw("freespan into cached span");
// 將收集的 object 鏈表追加到 span.freelist。
wasempty = s->freelist == nil;
end->next = s->freelist;
s->freelist = start;
s->ref -= n;
// 將 span 轉(zhuǎn)移到 central.nonempty 鏈表。
if(wasempty) {
runtime·MSpanList_Remove(s);
runtime·MSpanList_Insert(&c->nonempty, s);
}
// 重置回收代齡。
runtime·atomicstore(&s->sweepgen, runtime·mheap.sweepgen);
if(s->ref != 0) {
return false;
}
// 如果 span 收回全部 object (span.ref == 0),從 central 管理鏈表移除。
runtime·MSpanList_Remove(s);
s->needzero = 1;
s->freelist = nil;
// 清除標(biāo)記位。
runtime·unmarkspan((byte*)(s->start<<PageShift), s->npages<<PageShift);
// 將 span 交還給 heap。
runtime·MHeap_Free(&runtime·mheap, s, 0);
return true;
}
釋放操作最終結(jié)果,僅僅是將可回收對(duì)象歸還給 span.freelist 或 heap.free 鏈表,以便后續(xù)分配操作復(fù)用。至于物理內(nèi)存釋放,則由垃圾回收器的特殊定時(shí)操作完成。
除了用戶內(nèi)存,分配器還需額外的 span、cache 等對(duì)象來維持系統(tǒng)運(yùn)轉(zhuǎn)。這些管理對(duì)象所需內(nèi)存不從 arena 區(qū)域分配,不占用與 GC Heap 分配算法有關(guān)的內(nèi)存地址。
系統(tǒng)為每種管理對(duì)象初始化一個(gè)固定分配器 FixAlloc。
malloc.h
struct FixAlloc
{
uintptr size; // 固定分配長(zhǎng)度。
void (*first)(void *arg, byte *p); // 關(guān)聯(lián)函數(shù)。
void* arg; // first 函數(shù)調(diào)用參數(shù)。
MLink* list; // 可復(fù)用空間鏈表。
byte* chunk; // 后備內(nèi)存塊當(dāng)前分配指針。
uint32 nchunk; // 后備內(nèi)存塊可用長(zhǎng)度。
uintptr inuse; // 后備內(nèi)存塊已使用長(zhǎng)度。
};
mheap.c
void runtime·MHeap_Init(MHeap *h)
{
runtime·FixAlloc_Init(&h->spanalloc, sizeof(MSpan), RecordSpan, ...);
runtime·FixAlloc_Init(&h->cachealloc, sizeof(MCache), nil, ...);
runtime·FixAlloc_Init(&h->specialfinalizeralloc, sizeof(SpecialFinalizer), ...);
runtime·FixAlloc_Init(&h->specialprofilealloc, sizeof(SpecialProfile), ...);
}
FixAlloc 初始化過程很簡(jiǎn)單。
mfixalloc.c
void runtime·FixAlloc_Init(FixAlloc *f, uintptr size,
void (*first)(void*, byte*), void *arg, uint64 *stat)
{
f->size = size;
f->first = first;
f->arg = arg;
f->list = nil;
f->chunk = nil;
f->nchunk = 0;
f->inuse = 0;
f->stat = stat;
}
分配算法和 cache 類似。首先從復(fù)用鏈表提取,如果沒找到,就從后備內(nèi)存塊截取。
malloc.h
FixAllocChunk = 16<<10," " // Chunk size for FixAlloc
mfixalloc.c
void* runtime·FixAlloc_Alloc(FixAlloc *f)
{
void *v;
// 如果空閑鏈表不為空,直接從鏈表提取。
if(f->list) {
v = f->list;
f->list = *(void**)f->list;
f->inuse += f->size;
return v;
}
// 如果后備內(nèi)存塊空間不足...
if(f->nchunk < f->size) {
// 重新申請(qǐng) 16KB 后備內(nèi)存。
f->chunk = runtime·persistentalloc(FixAllocChunk, 0, f->stat);
f->nchunk = FixAllocChunk;
}
// 從后備內(nèi)存塊截取。
v = f->chunk;
// 執(zhí)行 first 函數(shù)。
if(f->first)
f->first(f->arg, v);
// 調(diào)整剩余后備塊參數(shù)。
f->chunk += f->size;
f->nchunk -= f->size;
f->inuse += f->size;
return v;
}
后備內(nèi)存塊策略有點(diǎn)類似 heap span,申請(qǐng)大塊內(nèi)存以減少系統(tǒng)調(diào)用開銷。實(shí)際上,不同類別的 FixAlloc 會(huì)共享一個(gè)超大塊內(nèi)存,稱之為 persistent。
malloc.go
var persistent struct { // 全局變量,為全部 FixAlloc 提供后備內(nèi)存塊。
lock mutex
pos unsafe.Pointer
end unsafe.Pointer
}
func persistentalloc(size, align uintptr, stat *uint64) unsafe.Pointer {
const (
chunk = 256 << 10
maxBlock = 64 << 10 // VM reservation granularity is 64K on windows
)
// 如果需要 64KB 以上,直接從 mmap 返回。
if size >= maxBlock {
return sysAlloc(size, stat)
}
// 對(duì)齊分配地址。
persistent.pos = roundup(persistent.pos, align)
// 如果剩余空間不足 ...
if uintptr(persistent.pos)+size > uintptr(persistent.end) {
// 重新從 mmap 申請(qǐng) 256KB 內(nèi)存,保存到 persistent。
persistent.pos = sysAlloc(chunk, &memstats.other_sys)
persistent.end = add(persistent.pos, chunk)
}
// 截取內(nèi)存,調(diào)整下次分配地址。
p := persistent.pos
persistent.pos = add(persistent.pos, size)
return p
}
mem_linux.c
void* runtime·sysAlloc(uintptr n, uint64 *stat)
{
p = runtime·mmap(nil, n, PROT_READ|PROT_WRITE, MAP_ANON|MAP_PRIVATE, -1, 0);
return p;
}
釋放操作僅僅是將對(duì)象收回到復(fù)用鏈表。
mfixalloc.c
void runtime·FixAlloc_Free(FixAlloc *f, void *p)
{
f->inuse -= f->size;
*(void**)p = f->list;
f->list = p;
}
另外,在 FixAlloc 初始化時(shí),還可額外提供一個(gè) first 函數(shù)作為參數(shù),比如 spanalloc 中的 RecordSpan。
該函數(shù)為 heap.allspans 分配內(nèi)存,其內(nèi)存儲(chǔ)了所有 span 指針,GC Sweep 和 Heap Dump 操作都會(huì)用到這些信息。
mheap.c
static void RecordSpan(void *vh, byte *p)
{
MHeap *h;
MSpan *s;
MSpan **all;
uint32 cap;
h = vh;
s = (MSpan*)p;
// 如果空間不足 ...
if(h->nspan >= h->nspancap) {
// 計(jì)算新容量。
cap = 64*1024/sizeof(all[0]);
if(cap < h->nspancap*3/2)
cap = h->nspancap*3/2;
// 分配新空間。
all = (MSpan**)runtime·sysAlloc(cap*sizeof(all[0]), &mstats.other_sys);
if(h->allspans) {
// 將數(shù)據(jù)拷貝到新分配空間。
runtime·memmove(all, h->allspans, h->nspancap*sizeof(all[0]));
// 釋放原內(nèi)存。
if(h->allspans != runtime·mheap.gcspans)
runtime·SysFree(h->allspans, h->nspancap*sizeof(all[0]),
&mstats.other_sys);
}
// 指向新內(nèi)存空間。
h->allspans = all;
h->nspancap = cap;
}
// 存儲(chǔ) span 指針。
h->allspans[h->nspan++] = s;
}
精確垃圾回收,很經(jīng)典的 Mark-and-Sweep 算法。
當(dāng)分配 (malloc) 總量超出預(yù)設(shè)閾值,就會(huì)引發(fā)垃圾回收。操作前,須暫停用戶邏輯執(zhí)行(StopTheWorld),然后啟用多個(gè)線程執(zhí)行并行掃描工作,直到標(biāo)記出所有可回收對(duì)象。
從 Go 1.3 開始,默認(rèn)采用并發(fā)內(nèi)存清理模式。也就是說,標(biāo)記結(jié)束后,立即恢復(fù)邏輯執(zhí)行 (StartTheWorld)。用一個(gè)專門的 goroutine 在后臺(tái)清理內(nèi)存。這縮短了暫停時(shí)間,在一定程度上改善了垃圾回收所引發(fā)的問題。
完成清理后,新閾值通常是存活對(duì)象所用內(nèi)存的 2 倍。需要注意的是,清理操作只是調(diào)用內(nèi)存分配器的相關(guān)方法,收回不可達(dá)對(duì)象內(nèi)存進(jìn)行復(fù)用,并未釋放物理內(nèi)存。
物理內(nèi)存釋放由專門線程定期執(zhí)行。它檢查最后一次垃圾回收時(shí)間,如超過 2 分鐘,則執(zhí)行強(qiáng)制回收。還會(huì)讓操作系統(tǒng)收回閑置超過 5 分鐘的 span 物理內(nèi)存。
初始化函數(shù)創(chuàng)建并行標(biāo)記狀態(tài)對(duì)象 markfor,讀取 GOGC 環(huán)境變量值。
proc.c
void runtime·schedinit(void)
{
runtime·gcinit();
}
mgc0.c
void runtime·gcinit(void)
{
runtime·work.markfor = runtime·parforalloc(MaxGcproc);
runtime·gcpercent = runtime·readgogc();
}
int32 runtime·readgogc(void)
{
byte *p;
p = runtime·getenv("GOGC");
// 默認(rèn)值 100。
if(p == nil || p[0] == '\0')
return 100;
// 關(guān)閉垃圾回收。
if(runtime·strcmp(p, (byte*)"off") == 0)
return -1;
return runtime·atoi(p);
}
在內(nèi)存分配器中提到過,函數(shù) mallocgc 會(huì)檢查已分配內(nèi)存是否超過閾值,并以此觸發(fā)垃圾回收操作。
malloc.go
func mallocgc(size uintptr, typ *_type, flags uint32) unsafe.Pointer {
if memstats.heap_alloc >= memstats.next_gc {
gogc(0)
}
}
啟動(dòng)垃圾回收有三種不同方式。
malloc.go
func gogc(force int32) {
// 如果 GOGC < 0,禁用垃圾回收,直接返回。
if gp := getg(); gp == mp.g0 || mp.locks > 1 || !memstats.enablegc ||
panicking != 0 || gcpercent < 0 {
return
}
semacquire(&worldsema, false)
// 普通回收,會(huì)再次檢查是否達(dá)到回收閾值。
if force == 0 && memstats.heap_alloc < memstats.next_gc {
semrelease(&worldsema)
return
}
// 準(zhǔn)備回收 ...
startTime := nanotime()
mp = acquirem()
mp.gcing = 1
// 停止用戶邏輯執(zhí)行。
onM(stoptheworld)
// 清理 sync.Pool 的相關(guān)緩存對(duì)象,這個(gè)后面有專門的剖析章節(jié)。
clearpools()
// 如果設(shè)置環(huán)境變量 GODEBUG=gctrace=2,那么會(huì)引發(fā)兩次回收操作。
n := 1
if debug.gctrace > 1 {
n = 2
}
for i := 0; i < n; i++ {
if i > 0 {
startTime = nanotime()
}
// 將 64-bit 開始時(shí)間保存到 scalararg 。
mp.scalararg[0] = uintptr(uint32(startTime)) // low 32 bits
mp.scalararg[1] = uintptr(startTime >> 32) // high 32 bits
// 清理行為標(biāo)記。
if force >= 2 {
mp.scalararg[2] = 1 // eagersweep
} else {
mp.scalararg[2] = 0
}
// 在 g0 棧執(zhí)行垃圾回收操作。
onM(gc_m)
}
// 回收結(jié)束。
mp.gcing = 0
semrelease(&worldsema)
// 恢復(fù)用戶邏輯執(zhí)行。
onM(starttheworld)
}
總體邏輯倒不復(fù)雜,StopTheWorld -> GC -> StartTheWorld。暫時(shí)拋開周邊細(xì)節(jié),看看垃圾回收流程。
mgc0.c
void runtime·gc_m(void)
{
a.start_time = (uint64)(g->m->scalararg[0]) | ((uint64)(g->m->scalararg[1]) << 32);
a.eagersweep = g->m->scalararg[2];
gc(&a);
}
static void gc(struct gc_args *args)
{
// 如果前次回收的清理操作未完成,那么先把這事結(jié)束了。
while(runtime·sweepone() != -1)
runtime·sweep.npausesweep++;
// 為回收操作準(zhǔn)備相關(guān)環(huán)境狀態(tài)。
runtime·mheap.gcspans = runtime·mheap.allspans;
runtime·work.spans = runtime·mheap.allspans;
runtime·work.nspan = runtime·mheap.nspan;
runtime·work.nwait = 0;
runtime·work.ndone = 0;
runtime·work.nproc = runtime·gcprocs();
// 初始化并行標(biāo)記狀態(tài)對(duì)象 markfor。
// 使用 nproc 個(gè)線程執(zhí)行并行標(biāo)記任務(wù)。
// 任務(wù)總數(shù) = 固定內(nèi)存段(RootCount) + 當(dāng)前 goroutine G 的數(shù)量。
// 標(biāo)記函數(shù) markroot。
runtime·parforsetup(runtime·work.markfor, runtime·work.nproc,
RootCount + runtime·allglen, nil, false, markroot);
if(runtime·work.nproc > 1) {
// 重置結(jié)束標(biāo)記。
runtime·noteclear(&runtime·work.alldone);
// 喚醒 nproc - 1 個(gè)線程準(zhǔn)備執(zhí)行 markroot 函數(shù),因?yàn)楫?dāng)前線程也會(huì)參與標(biāo)記工作。
runtime·helpgc(runtime·work.nproc);
}
// 讓當(dāng)前線程也開始執(zhí)行標(biāo)記任務(wù)。
gchelperstart();
runtime·parfordo(runtime·work.markfor);
scanblock(nil, 0, nil);
if(runtime·work.nproc > 1)
// 休眠,等待標(biāo)記全部結(jié)束。
runtime·notesleep(&runtime·work.alldone);
// 收縮 stack 內(nèi)存。
runtime·shrinkfinish();
// 更新所有 cache 統(tǒng)計(jì)參數(shù)。
cachestats();
// 計(jì)算上一次回收后 heap_alloc 大小。
// 當(dāng)前 next_gc = heap0 + heap0 * (gcpercent/100)
// 那么 heap0 = next_gc / (1 + gcpercent/100)
heap0 = mstats.next_gc*100/(runtime·gcpercent+100);
// 計(jì)算下一次 next_gc 閾值。
// 這個(gè)值只是預(yù)估,會(huì)隨著清理操作而改變。
mstats.next_gc = mstats.heap_alloc+mstats.heap_alloc*runtime·gcpercent/100;
runtime·atomicstore64(&mstats.last_gc, runtime·unixnanotime());
// 目標(biāo)是 heap.allspans 里的所有 span 對(duì)象。
runtime·mheap.gcspans = runtime·mheap.allspans;
// GC 使用遞增的代齡來表示 span 當(dāng)前回收狀態(tài)。
runtime·mheap.sweepgen += 2;
runtime·mheap.sweepdone = false;
runtime·work.spans = runtime·mheap.allspans;
runtime·work.nspan = runtime·mheap.nspan;
runtime·sweep.spanidx = 0;
if(ConcurrentSweep && !args->eagersweep) { // 并發(fā)清理
// 新建或喚醒用于清理操作的 goroutine。
if(runtime·sweep.g == nil)
runtime·sweep.g = runtime·newproc1(&bgsweepv, nil, 0, 0, gc);
else if(runtime·sweep.parked) {
runtime·sweep.parked = false;
runtime·ready(runtime·sweep.g); // 喚醒
}
} else { // 串行回收
// 立即執(zhí)行清理操作。
while(runtime·sweepone() != -1)
runtime·sweep.npausesweep++;
}
}
算法的核心是并行回收和是否啟用一個(gè) goroutine 來執(zhí)行清理操作。這個(gè) goroutine 在清理操作結(jié)束后被凍結(jié),再次使用前必須喚醒。
如果用專門的 goroutine 執(zhí)行清理操作,那么 gc 函數(shù)不等清理操作結(jié)束就立即返回,上級(jí)的 gogc 會(huì)立即調(diào)用 StartTheWorld 恢復(fù)用戶邏輯執(zhí)行,這就是并發(fā)回收的關(guān)鍵。
我們回過頭,看看一些中間環(huán)節(jié)的實(shí)現(xiàn)細(xì)節(jié)。
在設(shè)置并行回收狀態(tài)對(duì)象 markfor 里提到過兩個(gè)參數(shù):任務(wù)總數(shù)和標(biāo)記函數(shù)。
mgc0.c
enum {
RootCount = 5
}
任務(wù)總數(shù)其實(shí)是 5 個(gè)根內(nèi)存段 RootData、RootBBS、RootFinalizers、RootSpans、RootFlushCaches,外加所有 goroutine stack 的總和。
mgc0.c
static void markroot(ParFor *desc, uint32 i)
{
switch(i) {
case RootData:
...
break;
case RootBss:
...
break;
case RootFinalizers:
...
break;
case RootSpans:
...
break;
case RootFlushCaches:
flushallmcaches(); // 清理 cache、stack。
break;
default:
gp = runtime·allg[i - RootCount];
runtime·shrinkstack(gp); // 收縮 stack。
scanstack(gp);
...
break;
}
}
核心算法 scanblock 函數(shù)通過掃描內(nèi)存塊,找出存活對(duì)象和可回收對(duì)象,并在 bitmap 區(qū)域進(jìn)行標(biāo)記。具體實(shí)現(xiàn)細(xì)節(jié),本文不做詳述,有興趣可自行閱讀源碼或相關(guān)論文。
那么 parfor 是如何實(shí)現(xiàn)并行回收的呢?
這里面有個(gè)很大誤導(dǎo)。其實(shí) parfor 實(shí)現(xiàn)非常簡(jiǎn)單,僅是一個(gè)狀態(tài)對(duì)象,核心是將要執(zhí)行的多個(gè)任務(wù)序號(hào)平均分配個(gè)多個(gè)線程。
parfor.c
struct ParForThread
{
// the thread's iteration space [32lsb, 32msb)
uint64 pos;
};
void runtime·parforsetup(ParFor *desc, uint32 nthr, uint32 n, void *ctx, bool wait,
void (*body)(ParFor*, uint32))
{
uint32 i, begin, end;
uint64 *pos;
desc->body = body; // 任務(wù)函數(shù)
desc->nthr = nthr; // 并發(fā)線程數(shù)量
desc->thrseq = 0;
desc->cnt = n; // 任務(wù)總數(shù)
// 為線程平均分配任務(wù)編號(hào)。
// 比如 10 個(gè)任務(wù)分配給 5 個(gè)線程,那么 thr[0] 就是 [0,2),也就是 0 和 1 這兩個(gè)任務(wù)。
// 起始和結(jié)束編號(hào)分別保存在 ParForThread.pos 字段的高低位。
for(i=0; i<nthr; i++) {
begin = (uint64)n*i / nthr;
end = (uint64)n*(i+1) / nthr;
pos = &desc->thr[i].pos;
*pos = (uint64)begin | (((uint64)end)<<32);
}
}
現(xiàn)在任務(wù)被平均分配,并保存到全局變量 markfor 里。接下來的操作,其實(shí)是由被喚醒的線程主動(dòng)完成,如同當(dāng)前 GC 主線程主動(dòng)調(diào)用 parfordo 一樣。
執(zhí)行標(biāo)記任務(wù)的多個(gè)線程由 helpgc 函數(shù)喚醒,其中的關(guān)鍵就是設(shè)置 M.helpgc 標(biāo)記。
proc.c
void runtime·helpgc(int32 nproc)
{
pos = 0;
// 從 1 開始,因?yàn)楫?dāng)前線程也會(huì)參與標(biāo)記任務(wù)。
for(n = 1; n < nproc; n++) {
// 檢查 P 是否被當(dāng)前線程使用,如果是就跳過。
if(runtime·allp[pos]->mcache == g->m->mcache)
pos++;
// 獲取空閑線程。
mp = mget();
// 這是關(guān)鍵,線程喚醒后會(huì)檢查該標(biāo)記。
mp->helpgc = n;
// 為線程分配用戶執(zhí)行的 P.cache。
mp->mcache = runtime·allp[pos]->mcache;
pos++;
// 喚醒線程。
runtime·notewakeup(&mp->park);
}
}
如果你熟悉線程 M 的工作方式,那么就會(huì)知道它通過 stopm 完成休眠操作。
proc.c
static void stopm(void)
{
// 放回空閑隊(duì)列。
mput(g->m);
// 休眠,直到被喚醒。
runtime·notesleep(&g->m->park);
// 被喚醒后,清除休眠標(biāo)記。
runtime·noteclear(&g->m->park);
// 檢查 helpgc 標(biāo)記,執(zhí)行 gchelper 函數(shù)。
if(g->m->helpgc) {
runtime·gchelper();
g->m->helpgc = 0;
g->m->mcache = nil;
goto retry;
}
}
mgc0.c
void runtime·gchelper(void)
{
gchelperstart();
runtime·parfordo(runtime·work.markfor);
scanblock(nil, 0, nil);
// 檢查標(biāo)記是否全部結(jié)束。
nproc = runtime·work.nproc;
if(runtime·xadd(&runtime·work.ndone, +1) == nproc-1)
// 喚醒 GC 主線程。
runtime·notewakeup(&runtime·work.alldone);
g->m->traceback = 0;
}
最終和 GC 主線程調(diào)用過程一致。當(dāng) alldone 被喚醒后,GC 主線程恢復(fù)后續(xù)步驟執(zhí)行。
至于被線程調(diào)用的 parfordo,其實(shí)也很簡(jiǎn)單。
parfor.c
void runtime·parfordo(ParFor *desc)
{
// 每次調(diào)用,都會(huì)遞增 thrseq 值。
tid = runtime·xadd(&desc->thrseq, 1) - 1;
// 如果任務(wù)線程數(shù)量為 1,那么沒什么好說的,直接循環(huán)執(zhí)行 body,也就是 markroot。
if(desc->nthr==1) {
for(i=0; i<desc->cnt; i++)
desc->body(desc, i);
return;
}
body = desc->body;
// 用 tid 作為當(dāng)前線程的編號(hào),以此提取任務(wù)范圍值。
me = &desc->thr[tid];
mypos = &me->pos;
for(;;) {
// 先完成自己的任務(wù)。
for(;;) {
// 遞增當(dāng)前任務(wù)范圍的開始編號(hào)。
pos = runtime·xadd64(mypos, 1);
// 注意:只有低32位被修改,高32位結(jié)束編號(hào)不變。
begin = (uint32)pos-1;
end = (uint32)(pos>>32);
// 如果小于結(jié)束編號(hào),循環(huán)。
if(begin < end) {
// 執(zhí)行 markroot 標(biāo)記函數(shù)。
body(desc, begin);
continue;
}
break;
}
// 嘗試從其他線程偷點(diǎn)任務(wù)過來,以便盡快完成所有標(biāo)記操作。
idle = false;
for(try=0;; try++) {
// 如果長(zhǎng)時(shí)間沒有偷到任務(wù),設(shè)置結(jié)束標(biāo)記。
// increment the done counter...
if(try > desc->nthr*4 && !idle) {
idle = true;
runtime·xadd(&desc->done, 1);
}
// 如果所有線程都結(jié)束,那么退出。
if(desc->done + !idle == desc->nthr) {
if(!idle)
runtime·xadd(&desc->done, 1);
goto exit;
}
// 隨機(jī)選擇一個(gè)線程任務(wù)。
victim = runtime·fastrand1() % (desc->nthr-1);
if(victim >= tid)
victim++;
victimpos = &desc->thr[victim].pos;
for(;;) {
// 偷取任務(wù)。
pos = runtime·atomicload64(victimpos);
begin = (uint32)pos;
end = (uint32)(pos>>32);
if(begin+1 >= end) {
begin = end = 0;
break;
}
if(idle) {
runtime·xadd(&desc->done, -1);
idle = false;
}
begin2 = begin + (end-begin)/2;
newpos = (uint64)begin | (uint64)begin2<<32;
if(runtime·cas64(victimpos, pos, newpos)) {
begin = begin2;
break;
}
}
// 成功偷到任務(wù)...
if(begin < end) {
// 添加到自己的任務(wù)列表中。
runtime·atomicstore64(mypos, (uint64)begin | (uint64)end<<32);
// 返回外層循環(huán),上面的任務(wù)處理代碼再次被激活。
break;
}
// ...
}
}
exit:
// ...
}
每個(gè)線程調(diào)用 parfordo 的時(shí)候,都拿到一個(gè)遞增的唯一 thrseq 編號(hào),并以此獲得事先由 parforsetup 分配好的任務(wù)段。接下來,自然是該線程循環(huán)執(zhí)行分配給自己的所有任務(wù),任務(wù)編號(hào)被傳遞給 markroot 作為選擇目標(biāo)的判斷條件。
在完成自己的任務(wù)后,嘗試分擔(dān)其他線程任務(wù),以盡快完成全部任務(wù)。這種 steal 算法,在運(yùn)行時(shí)的很多地方都有體現(xiàn),算是并行開發(fā)的一個(gè) “標(biāo)準(zhǔn)” 做法了。
至此,并行標(biāo)記的所有秘密被揭開,我們繼續(xù)探究清理操作過程。
不管是并發(fā)還是串行清理,最終都是調(diào)用 sweepone 函數(shù)。
mgc0.c
static FuncVal bgsweepv = {runtime·bgsweep};
mgc0.go
func bgsweep() {
for {
for gosweepone() != ^uintptr(0) {
sweep.nbgsweep++
Gosched()
}
if !gosweepdone() {
continue
}
// 設(shè)置休眠標(biāo)志。
sweep.parked = true
// 休眠當(dāng)前清理 goroutine。
goparkunlock(&gclock, "GC sweep wait")
}
}
mgc0.c
uintptr runtime·gosweepone(void)
{
void (*fn)(void);
fn = sweepone_m;
runtime·onM(&fn);
return g->m->scalararg[0];
}
static void sweepone_m(void)
{
g->m->scalararg[0] = runtime·sweepone();
}
清理函數(shù)實(shí)現(xiàn)很簡(jiǎn)潔,每次找到一個(gè)待清理的 span,然后調(diào)用 span_sweep 收回對(duì)應(yīng)的內(nèi)存,這在內(nèi)存分配器的釋放過程中已經(jīng)說得很清楚了。
mgc0.c
uintptr runtime·sweepone(void)
{
// 當(dāng)前代齡,清理前 += 2。
sg = runtime·mheap.sweepgen;
// 循環(huán)掃描所有 spans。
for(;;) {
idx = runtime·xadd(&runtime·sweep.spanidx, 1) - 1;
// 結(jié)束判斷。
if(idx >= runtime·work.nspan) {
runtime·mheap.sweepdone = true;
return -1;
}
// 獲取 span。
s = runtime·work.spans[idx];
// 如果不是正在使用的 span,無需清理。
if(s->state != MSpanInUse) {
s->sweepgen = sg;
continue;
}
// 如果不是待清理 span,跳過。
if(s->sweepgen != sg-2 || !runtime·cas(&s->sweepgen, sg-2, sg-1))
continue;
npages = s->npages;
// 清理。
if(!runtime·MSpan_Sweep(s, false))
npages = 0;
return npages;
}
}
最后剩下的,就是 StopTheWorld 和 StartTheWorld 如何停止和恢復(fù)用戶邏輯執(zhí)行。因這會(huì)涉及一些 Goroutine Scheduler 知識(shí),您可以暫時(shí)跳過,等看完后面的相關(guān)章節(jié)再回頭研究。
proc.c
void runtime·stoptheworld(void)
{
runtime·lock(&runtime·sched.lock);
// 計(jì)數(shù)器。
runtime·sched.stopwait = runtime·gomaxprocs;
// 設(shè)置關(guān)鍵停止標(biāo)記。
runtime·atomicstore((uint32*)&runtime·sched.gcwaiting, 1);
// 在所有運(yùn)行的 goroutine 上設(shè)置搶占標(biāo)志。
preemptall();
// 設(shè)置當(dāng)前 P 的狀態(tài)。
g->m->p->status = Pgcstop; // Pgcstop is only diagnostic.
runtime·sched.stopwait--;
// 設(shè)置所有處理系統(tǒng)調(diào)用 P 的狀態(tài)。
for(i = 0; i < runtime·gomaxprocs; i++) {
p = runtime·allp[i];
s = p->status;
if(s == Psyscall && runtime·cas(&p->status, s, Pgcstop))
runtime·sched.stopwait--;
}
// 設(shè)置所有空閑 P 狀態(tài)。
while(p = pidleget()) {
p->status = Pgcstop;
runtime·sched.stopwait--;
}
wait = runtime·sched.stopwait > 0;
runtime·unlock(&runtime·sched.lock);
// 等待所有 P 停止。
if(wait) {
for(;;) {
// 等待 100us,直到休眠標(biāo)記被喚醒。
if(runtime·notetsleep(&runtime·sched.stopnote, 100*1000)) {
// 清除休眠標(biāo)記。
runtime·noteclear(&runtime·sched.stopnote);
break;
}
// 再次發(fā)出搶占標(biāo)記。
preemptall();
}
}
}
從代碼上來看,StopTheWorld 只是設(shè)置了一些標(biāo)記,包括搶占行為也不過是在在運(yùn)行的 goroutine 上設(shè)置搶占標(biāo)記。具體這些標(biāo)記是如何讓正在運(yùn)行的 goroutine 暫停的呢?
如果了解 goroutine 運(yùn)行機(jī)制,必然知道它總是循環(huán)執(zhí)行 schedule 函數(shù),在這個(gè)函數(shù)頭部會(huì)檢查 gcwaiting 標(biāo)記,并以此停止當(dāng)前任務(wù)執(zhí)行。
proc.c
static void schedule(void)
{
// 檢查 gcwaiting 標(biāo)記,停止當(dāng)前任務(wù)執(zhí)行。
if(runtime·sched.gcwaiting) {
gcstopm();
}
...
}
static void gcstopm(void)
{
// 釋放關(guān)聯(lián)的 P。
p = releasep();
runtime·lock(&runtime·sched.lock);
p->status = Pgcstop;
// 遞減計(jì)數(shù)器,直到喚醒.
if(--runtime·sched.stopwait == 0)
runtime·notewakeup(&runtime·sched.stopnote);
runtime·unlock(&runtime·sched.lock);
stopm();
}
這樣一來,所有正在執(zhí)行的 goroutine 會(huì)被放回隊(duì)列,相關(guān)任務(wù)線程也被休眠。至于發(fā)出搶占標(biāo)記,是為了讓一直處于忙碌狀態(tài)的 goroutine 有機(jī)會(huì)檢查停止標(biāo)記。
反過來,StartTheWorld 就是恢復(fù)這些被停止的任務(wù),并喚醒線程繼續(xù)執(zhí)行。
proc.c
void runtime·starttheworld(void)
{
...
// 重置標(biāo)記。
runtime·sched.gcwaiting = 0;
p1 = nil;
// 循環(huán)所有 P。
while(p = pidleget()) {
// 如果該 P 沒有任務(wù),那么放回空閑隊(duì)列。
// 因?yàn)闆]有任務(wù)的 P 被放在列表尾部,故無需繼續(xù)遍歷。
if(p->runqhead == p->runqtail) {
pidleput(p);
break;
}
// 關(guān)聯(lián)一個(gè)空閑 M 線程。
p->m = mget();
// 將準(zhǔn)備工作的 P 串成鏈表。
p->link = p1;
p1 = p;
}
// 喚醒 sysmon。
if(runtime·sched.sysmonwait) {
runtime·sched.sysmonwait = false;
runtime·notewakeup(&runtime·sched.sysmonnote);
}
runtime·unlock(&runtime·sched.lock);
// 遍歷準(zhǔn)備工作的 P。
while(p1) {
p = p1;
p1 = p1->link;
// 檢查并喚醒關(guān)聯(lián)線程 M。
if(p->m) {
mp = p->m;
runtime·notewakeup(&mp->park);
} else {
// 如果沒有關(guān)聯(lián)線程,新建。
newm(nil, p);
add = false;
}
}
// ...
}
垃圾回收操作雖然關(guān)聯(lián)很多東西,但我們基本理清了它的運(yùn)作流程。如同在分配器一章中所說,垃圾回收只是將回收內(nèi)存,并沒有釋放空閑的物理內(nèi)存。
在 main goroutine 入口,運(yùn)行時(shí)使用一個(gè)專用線程運(yùn)行 sysmon 操作。
proc.go
// The main goroutine.
func main() {
...
onM(newsysmon)
...
main_init()
main_main()
}
proc.c
void runtime·newsysmon(void)
{
newm(sysmon, nil);
}
在 sysmon 里面會(huì)定時(shí)啟動(dòng)強(qiáng)制垃圾回收和物理內(nèi)存釋放操作。
proc.c
static void sysmon(void)
{
// 如果超過 2 分鐘沒有運(yùn)行 gc,則強(qiáng)制回收。
forcegcperiod = 2*60*1e9;
// 如果空閑 span 超過 5 分鐘未被使用,則釋放其關(guān)聯(lián)物理內(nèi)存。
scavengelimit = 5*60*1e9;
for(;;) {
runtime·usleep(delay);
// 啟動(dòng)強(qiáng)制垃圾回收。
lastgc = runtime·atomicload64(&mstats.last_gc);
if(lastgc != 0 && unixnow - lastgc > forcegcperiod && ...) {
runtime·forcegc.idle = 0;
runtime·forcegc.g->schedlink = nil;
// 將強(qiáng)制垃圾回收 goroutine 放回任務(wù)隊(duì)列。
injectglist(runtime·forcegc.g);
}
// 啟動(dòng)物理內(nèi)存釋放操作。
if(lastscavenge + scavengelimit/2 < now) {
runtime·MHeap_Scavenge(nscavenge, now, scavengelimit);
lastscavenge = now;
nscavenge++; // 計(jì)數(shù)器。
}
}
}
先說強(qiáng)制垃圾回收操作,這個(gè)神秘的 forcegc.g 從何而來?
proc.go
// start forcegc helper goroutine
func init() {
go forcegchelper()
}
依照 Go 語言規(guī)則,這個(gè) init 初始化函數(shù)會(huì)被 main goroutine 執(zhí)行,它創(chuàng)建了一個(gè)用來執(zhí)行強(qiáng)制回收操作的 goroutine。
proc.go
func forcegchelper() {
forcegc.g = getg()
forcegc.g.issystem = true
for {
// 休眠該 goroutine。
// park 會(huì)暫停 goroutine,但不會(huì)放回待運(yùn)行隊(duì)列。
goparkunlock(&forcegc.lock, "force gc (idle)")
// 喚醒后,執(zhí)行強(qiáng)制垃圾回收。
gogc(1)
}
}
這個(gè) forcegc.g 會(huì)循環(huán)執(zhí)行,每次完成后休眠,直到被 sysmon 重新返回任務(wù)隊(duì)列。
為什么要定期運(yùn)行強(qiáng)制回收?試想一下,假設(shè)回收后已分配內(nèi)存是 1GB,那么下次回收閾值就是 2GB,這可能導(dǎo)致很長(zhǎng)時(shí)間無法觸發(fā)回收操作。這就存在很大的內(nèi)存浪費(fèi),所以強(qiáng)制回收是非常必要的。
接下來看看如何釋放物理內(nèi)存,這是另外一個(gè)關(guān)注焦點(diǎn)。
heap.c
void runtime·MHeap_Scavenge(int32 k, uint64 now, uint64 limit)
{
h = &runtime·mheap;
// 保存本次釋放的物理內(nèi)存數(shù)量。
sumreleased = 0;
// 循環(huán)處理 heap.free 里的空閑 span。
for(i=0; i < nelem(h->free); i++)
sumreleased += scavengelist(&h->free[i], now, limit);
// 處理 heap.freelarge 里的空閑 span。
sumreleased += scavengelist(&h->freelarge, now, limit);
}
釋放操作的目標(biāo)自然是 heap 里的那些空閑 span 內(nèi)存塊。
mheap.c
static uintptr scavengelist(MSpan *list, uint64 now, uint64 limit)
{
sumreleased = 0;
// 遍歷 span 鏈表。
for(s=list->next; s != list; s=s->next) {
// 條件:
// 未使用時(shí)間超過 5 分鐘;
// 已釋放物理內(nèi)存頁數(shù)不等于 span 總頁數(shù) (未釋放或部分釋放);
if((now - s->unusedsince) > limit && s->npreleased != s->npages) {
// 待釋放頁數(shù)。為什么不是全部?
released = (s->npages - s->npreleased) << PageShift;
mstats.heap_released += released;
sumreleased += released;
// 現(xiàn)在整個(gè) span.npages 都會(huì)被釋放。
s->npreleased = s->npages;
runtime·SysUnused((void*)(s->start << PageShift), s->npages << PageShift);
}
}
return sumreleased;
}
至于 npreleased != npages 的問題,先得看看 SysUnused 做了什么。
mem_linux.c
void runtime·SysUnused(void *v, uintptr n)
{
runtime·madvise(v, n, MADV_DONTNEED);
}
mem_darwin.c
void runtime·SysUnused(void *v, uintptr n)
{
// Linux's MADV_DONTNEED is like BSD's MADV_FREE.
runtime·madvise(v, n, MADV_FREE);
}
對(duì) Linux、darwin 等系統(tǒng)而言,MADV_DONTNEED、MADV_FREE 告訴操作系統(tǒng),這段物理內(nèi)存暫時(shí)不用,可解除 MMU 映射。再次使用時(shí),由操作系統(tǒng)重新建立映射。
注意,盡管物理內(nèi)存被釋放了,但這個(gè) span 管理對(duì)象依舊存活,它所占用的虛擬內(nèi)存并未釋放,依然會(huì)和左右相鄰進(jìn)行合并。這就是 npreleased 可能不等于 npages 的關(guān)鍵。
另外,在 Windows 系統(tǒng)下,事情有點(diǎn)特殊,它不支持類似 MADV_DONTNEED 行為。
mem_windows.c
void runtime·SysUnused(void *v, uintptr n)
{
r = runtime·stdcall3(runtime·VirtualFree, (uintptr)v, n, MEM_DECOMMIT);
}
顯然,VirtualFree 會(huì)釋放掉 span 管理的虛擬內(nèi)存。因此,從 heap 獲取 span 時(shí)需要重新分配內(nèi)存。
mheap.c
static MSpan* MHeap_AllocSpanLocked(MHeap *h, uintptr npage)
{
if(s->npreleased > 0) {
runtime·SysUsed((void*)(s->start<<PageShift), s->npages<<PageShift);
mstats.heap_released -= s->npreleased<<PageShift;
s->npreleased = 0;
}
}
mem_windows.c
void runtime·SysUsed(void *v, uintptr n)
{
r = runtime·stdcall4(runtime·VirtualAlloc, (uintptr)v, n, MEM_COMMIT,
PAGE_READWRITE);
}
除了 Windows 系統(tǒng),其他 Unix-Like 系統(tǒng)的 SysUsed 什么都不做。
mem_linux.c
void runtime·SysUsed(void *v, uintptr n)
{
USED(v);
USED(n);
}
mem_darwin.c
void runtime·SysUsed(void *v, uintptr n)
{
USED(v);
USED(n);
}
除自動(dòng)回收外,還可手工調(diào)用 debug/FreeOSMemory 釋放物理內(nèi)存。
mgc0.go
func freeOSMemory() {
gogc(2) // force GC and do eager sweep
onM(scavenge_m)
}
mheap.c
void runtime·scavenge_m(void)
{
runtime·MHeap_Scavenge(-1, ~(uintptr)0, 0); // ~(uintptr)0 = 18446744073709551615
}
這個(gè)調(diào)用的參數(shù),now 是比當(dāng)前實(shí)際時(shí)間大得多的整數(shù),而 limit 是 0。這意味這所有的空閑 span 都過期,都會(huì)被釋放物理內(nèi)存。
與內(nèi)存和垃圾回收相關(guān)的狀態(tài)對(duì)象。
malloc.h
struct MStats
{
// General statistics.
uint64 alloc; // 正在使用的 object 容量 (malloc)。
uint64 total_alloc; // 歷史分配總量,含已釋放內(nèi)存。
uint64 sys; // 當(dāng)前消耗的內(nèi)存總量,包括 heap、fixalloc 等。
uint64 nmalloc; // 分配操作次數(shù)。
uint64 nfree; // 釋放操作次數(shù)。
// Statistics about malloc heap.
uint64 heap_alloc; // 同 alloc,在使用的 object 容量。
uint64 heap_sys; // 當(dāng)前消耗的 heap 內(nèi)存總量 (mmap-munmap, inuse+idle)。
uint64 heap_idle; // 空閑 span 容量。
uint64 heap_inuse; // 正在使用 span 容量。
uint64 heap_released; // 交還給操作系統(tǒng)的物理內(nèi)存容量。
uint64 heap_objects; // 正在使用的 object 數(shù)量。
// Statistics about garbage collector.
uint64 next_gc; // 下次垃圾回收閾值。
uint64 last_gc; // 上次垃圾回收結(jié)束時(shí)間。
uint32 numgc; // 垃圾回收次數(shù)。
};
統(tǒng)計(jì)狀態(tài)更新函數(shù)。
mgc0.c
void runtime·updatememstats(GCStats *stats)
{
// 重置狀態(tài)對(duì)象。
if(stats)
runtime·memclr((byte*)stats, sizeof(*stats));
for(mp=runtime·allm; mp; mp=mp->alllink) {
if(stats) {
src = (uint64*)&mp->gcstats;
dst = (uint64*)stats;
for(i=0; i<sizeof(*stats)/sizeof(uint64); i++)
dst[i] += src[i];
runtime·memclr((byte*)&mp->gcstats, sizeof(mp->gcstats));
}
}
// FixAlloc 正在使用內(nèi)存統(tǒng)計(jì)。
mstats.mcache_inuse = runtime·mheap.cachealloc.inuse;
mstats.mspan_inuse = runtime·mheap.spanalloc.inuse;
// 從系統(tǒng)獲取的內(nèi)存總量 (mmap-munmap)。
mstats.sys = mstats.heap_sys + mstats.stacks_sys + mstats.mspan_sys +
mstats.mcache_sys + mstats.buckhash_sys + mstats.gc_sys + mstats.other_sys;
mstats.alloc = 0;
mstats.total_alloc = 0;
mstats.nmalloc = 0;
mstats.nfree = 0;
for(i = 0; i < nelem(mstats.by_size); i++) {
mstats.by_size[i].nmalloc = 0;
mstats.by_size[i].nfree = 0;
}
// 將所有 P.cache.alloc 所持有的 spans 歸還給 central。
if(g == g->m->g0)
flushallmcaches();
else {
fn = flushallmcaches_m;
runtime·mcall(&fn);
}
// 更新 cache 統(tǒng)計(jì)。
cachestats();
// 統(tǒng)計(jì)所有 spans 里正在使用的 object。
for(i = 0; i < runtime·mheap.nspan; i++) {
s = runtime·mheap.allspans[i];
if(s->state != MSpanInUse)
continue;
// 統(tǒng)計(jì)活躍的 object。
if(s->sizeclass == 0) {
mstats.nmalloc++;
mstats.alloc += s->elemsize;
} else {
mstats.nmalloc += s->ref;
mstats.by_size[s->sizeclass].nmalloc += s->ref;
mstats.alloc += s->ref*s->elemsize;
}
}
// 按 size class 統(tǒng)計(jì)累計(jì)分配和釋放次數(shù)。
smallfree = 0;
mstats.nfree = runtime·mheap.nlargefree;
for(i = 0; i < nelem(mstats.by_size); i++) {
mstats.nfree += runtime·mheap.nsmallfree[i];
mstats.by_size[i].nfree = runtime·mheap.nsmallfree[i];
mstats.by_size[i].nmalloc += runtime·mheap.nsmallfree[i];
smallfree += runtime·mheap.nsmallfree[i] * runtime·class_to_size[i];
}
mstats.nfree += mstats.tinyallocs;
mstats.nmalloc += mstats.nfree;
// 總分配容量 = 正在使用 object + 已釋放容量。
mstats.total_alloc = mstats.alloc + runtime·mheap.largefree + smallfree;
mstats.heap_alloc = mstats.alloc;
mstats.heap_objects = mstats.nmalloc - mstats.nfree;
}
標(biāo)準(zhǔn)庫 runtime.ReadMemStats 函數(shù)可刷新并讀取該狀態(tài)數(shù)據(jù)。
啟用環(huán)境變量 GODEBUG="gotrace=1" 可輸出垃圾回收相關(guān)狀態(tài)信息,這有助于對(duì)程序運(yùn)行狀態(tài)進(jìn)行監(jiān)控,是常見的一種測(cè)試手段。
第一類輸出信息來自垃圾回收函數(shù)。
mgc0.c
static void gc(struct gc_args *args)
{
t0 = args->start_time;
// 第 1 階段: 包括 stoptheworld、clearpools 在內(nèi)的初始化時(shí)間。
if(runtime·debug.gctrace)
t1 = runtime·nanotime();
// 第 2 階段: 標(biāo)記前的準(zhǔn)備時(shí)間。包括完成上次未結(jié)束的清理操作,準(zhǔn)備并行標(biāo)記環(huán)境等。
if(runtime·debug.gctrace)
t2 = runtime·nanotime();
// 第 3 階段: 并行標(biāo)記。
if(runtime·debug.gctrace)
t3 = runtime·nanotime();
// 第 4 階段: 收縮棧內(nèi)存,更新統(tǒng)計(jì)信息。
t4 = runtime·nanotime();
if(runtime·debug.gctrace) {
heap1 = mstats.heap_alloc;
runtime·updatememstats(&stats);
obj = mstats.nmalloc - mstats.nfree;
runtime·printf(
"gc%d(%d):" // 0, 1
" %D+%D+%D+%D us," // 2, 3, 4, 5
" %D -> %D MB," // 6, 7
" %D (%D-%D) objects," // 8, 9, 10
" %d goroutines," // 11
" %d/%d/%d sweeps," // 12, 13, 14
...,
mstats.numgc, // 0: GC 執(zhí)行次數(shù)。
runtime·work.nproc, // 1: 并行標(biāo)記線程數(shù)量。
(t1-t0)/1000, // 2: 含 StopTheWorld 在內(nèi)的初始化時(shí)間。
(t2-t1)/1000, // 3: 并行標(biāo)記準(zhǔn)備時(shí)間,包括上次未完成清理任務(wù)。
(t3-t2)/1000, // 4: 并行標(biāo)記時(shí)間。
(t4-t3)/1000, // 5: 收縮棧內(nèi)存,更新狀態(tài)等時(shí)間。
heap0>>20, // 6: 上次回收后 alloc 容量。
heap1>>20, // 7: 本次回收后 alloc 容量。
obj, // 8: 本次回收后正在使用的 object 數(shù)量。
mstats.nmalloc, // 9: 總分配次數(shù)。
mstats.nfree, // 10: 總釋放次數(shù)。
runtime·gcount(), // 11: 待運(yùn)行 Goroutine 任務(wù)數(shù)量。
runtime·work.nspan, // 12: heap.spans 數(shù)量。
runtime·sweep.nbgsweep, // 13: 本次并發(fā)清理 span 次數(shù)。
runtime·sweep.npausesweep, // 14: 本次串行清理 span 次數(shù)。
...
);
}
}
在并發(fā)清理模式下,信息輸出時(shí),清理工作尚未完成,因此標(biāo)出的容量信息并不準(zhǔn)確,只能通過多次輸出結(jié)果進(jìn)行大概評(píng)估。
第二類信息來自物理內(nèi)存釋放函數(shù)。
mheap.c
void runtime·MHeap_Scavenge(int32 k, uint64 now, uint64 limit)
{
if(runtime·debug.gctrace > 0) {
// 本次釋放的物理內(nèi)存容量。
if(sumreleased > 0)
runtime·printf("scvg%d: %D MB released\n", k, (uint64)sumreleased>>20);
runtime·printf(
"scvg%d: " // 0
"inuse: %D, " // 1
"idle: %D, " // 2
"sys: %D, " // 3
"released: %D, " // 4
"consumed: %D (MB)\n", // 5
k, // 0: 釋放次數(shù)。
mstats.heap_inuse>>20, // 1: 正在使用的 spans 容量。
mstats.heap_idle>>20, // 2: 空閑 spans 容量。
mstats.heap_sys>>20, // 3: 當(dāng)前 heap 虛擬內(nèi)存總?cè)萘俊? mstats.heap_released>>20, // 4: 已釋放物理內(nèi)存總?cè)萘俊? (mstats.heap_sys - mstats.heap_released)>>20 // 5: 實(shí)際消耗內(nèi)存容量。
);
}
}
現(xiàn)代操作系統(tǒng)通常會(huì)采用機(jī)會(huì)主義分配策略。內(nèi)核雖然承諾分配內(nèi)存,但實(shí)際并不會(huì)立即分配物理內(nèi)存。只有在發(fā)生讀寫操作時(shí),內(nèi)核才會(huì)把之前承諾的內(nèi)存轉(zhuǎn)換為物理內(nèi)存。而且也不是一次性完成,而是以頁的方式逐步分配,按需執(zhí)行頁面請(qǐng)求調(diào)度和寫入時(shí)復(fù)制。
所以,相關(guān)輸出結(jié)果更多表示虛擬內(nèi)存分配值,且和具體操作系統(tǒng)也有很大關(guān)系。
調(diào)度器是運(yùn)行時(shí)最核心的內(nèi)容,其基本理論建立在三種基本對(duì)象上。
首先,每次 go 關(guān)鍵詞調(diào)用都會(huì)創(chuàng)建一個(gè) goroutine 對(duì)象,代表 G 并發(fā)任務(wù)。其次,所有 G 任務(wù)都由系統(tǒng)線程執(zhí)行,這些線程被稱作 M。
每個(gè) G 對(duì)象都有自己的獨(dú)立棧內(nèi)存。當(dāng) M 執(zhí)行任務(wù)時(shí),從 G 用來保存執(zhí)行現(xiàn)場(chǎng)的字段中恢復(fù)相關(guān)寄存器值即可。當(dāng) M 需要切換任務(wù)時(shí),將寄存器值保存回當(dāng)前 G 對(duì)象,然后從另一 G 對(duì)象中恢復(fù),如此實(shí)現(xiàn)線程多路復(fù)用。
G 初始化棧內(nèi)存只有幾 KB 大小,按需擴(kuò)張、收縮。這種輕量級(jí)設(shè)計(jì)開銷極小,可輕松創(chuàng)建成千上萬的并發(fā)任務(wù)。
除此之外,還有抽象處理器 P,其數(shù)量決定了 G 并發(fā)任務(wù)數(shù)量。每個(gè)運(yùn)行 M 都必須獲取并綁定一個(gè) P 對(duì)象,如同線程必須被調(diào)度到某個(gè) CPU Core 才能執(zhí)行。P 還為 M 提供內(nèi)存分配器緩存和 G 任務(wù)隊(duì)列等執(zhí)行資源。
通常情況下,P 數(shù)量在初始化時(shí)確定,運(yùn)行時(shí)基本固定,但 M 的數(shù)量未必和 P 對(duì)應(yīng)。例如,某 M 因系統(tǒng)調(diào)用長(zhǎng)時(shí)間阻塞,其關(guān)聯(lián) P 就會(huì)被運(yùn)行時(shí)收回。然后,調(diào)度器會(huì)喚醒或新建 M 去執(zhí)行其他排隊(duì)任務(wù)。失去 P 的 M 被休眠,直到被重新喚醒。
由匯編代碼實(shí)現(xiàn)的 bootstrap 過程。
rt0_linux_amd64.s
TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
LEAQ 8(SP), SI // argv
MOVQ 0(SP), DI // argc
MOVQ $main(SB), AX
JMP AX
TEXT main(SB),NOSPLIT,$-8
MOVQ $runtime·rt0_go(SB), AX
JMP AX
要確定這個(gè)很簡(jiǎn)單,隨便找個(gè)可執(zhí)行文件,然后反匯編 entry point 即可。
(gdb) info files
Local exec file:
Entry point: 0x437940
(gdb) disass 0x437940
Dump of assembler code for function _rt0_amd64_linux:
現(xiàn)在可以確定初始化調(diào)用由 rt0_go 匯編完成。
amd_asm64.s
TEXT runtime·rt0_go(SB),NOSPLIT,$0
LEAQ runtime·g0(SB), CX
LEAQ runtime·m0(SB), AX
MOVQ CX, m_g0(AX) // save m->g0 = g0
MOVQ AX, g_m(CX) // save m0 to g0->m
CALL runtime·args(SB)
CALL runtime·osinit(SB)
CALL runtime·schedinit(SB)
// create a new goroutine to start program
MOVQ $runtime·main·f(SB), BP // entry
PUSHQ BP
PUSHQ $0 // arg size
CALL runtime·newproc(SB)
POPQ AX
POPQ AX
// start this M
CALL runtime·mstart(SB)
MOVL $0xf1, 0xf1 // crash
RET
按圖索驥,可以看到初始化過程相關(guān)的幾個(gè)函數(shù)都做了什么。
runtime.h
MaxGomaxprocs = 1<<8, // The max value of GOMAXPROCS.
proc.c
void runtime·schedinit(void)
{
// 設(shè)置最大 M 線程數(shù),超出會(huì)導(dǎo)致進(jìn)程崩潰。
runtime·sched.maxmcount = 10000;
// 初始化內(nèi)存分配器。
runtime·mallocinit();
// 獲取命令行參數(shù)、環(huán)境變量。
runtime·goargs();
runtime·goenvs();
// 垃圾回收器初始化。
runtime·gcinit();
// 初始化 P。
procs = 1;
p = runtime·getenv("GOMAXPROCS");
if(p != nil && (n = runtime·atoi(p)) > 0) {
if(n > MaxGomaxprocs)
n = MaxGomaxprocs;
procs = n;
}
procresize(procs);
}
其中內(nèi)存分配器、垃圾回收器前面都已研究過,此處不多費(fèi)唇舌。現(xiàn)在需要關(guān)心是 procs 這個(gè)最關(guān)鍵的 goroutine 并發(fā)控制參數(shù)。
proc.c
SchedT runtime·sched; // 調(diào)度器實(shí)例。
int32 runtime·gomaxprocs; // 當(dāng)前 GOMAXPROCS 值。
P* runtime·allp[MaxGomaxprocs+1]; // 存儲(chǔ)所有的 P 對(duì)象,最多 256 個(gè)實(shí)例。
proc.c
static void procresize(int32 new)
{
old = runtime·gomaxprocs;
// 初始化新 P 對(duì)象。
for(i = 0; i < new; i++) {
p = runtime·allp[i];
// 新建 P。
if(p == nil) {
p = runtime·newP();
p->id = i;
p->status = Pgcstop;
runtime·atomicstorep(&runtime·allp[i], p);
}
// 創(chuàng)建 P.cache。
if(p->mcache == nil) {
if(old==0 && i==0)
p->mcache = g->m->mcache; // bootstrap
else
p->mcache = runtime·allocmcache();
}
}
// 將 old P 里面的任務(wù)重新分布。
empty = false;
while(!empty) {
empty = true;
// 內(nèi)層 for 循環(huán)遍歷所有 old P,每次從中取一個(gè) G 任務(wù)。
// 外層 while 循環(huán)重復(fù)該過程,如此所有先生成的 G 會(huì)保存到全局隊(duì)列的前面,F(xiàn)IFO。
for(i = 0; i < old; i++) {
p = runtime·allp[i];
// 檢查 P 的 G 任務(wù)隊(duì)列。
if(p->runqhead == p->runqtail)
continue;
empty = false;
// 獲取尾部最后一個(gè) G。
p->runqtail--;
gp = p->runq[p->runqtail%nelem(p->runq)];
// 將 G 添加到全局任務(wù)鏈表。
gp->schedlink = runtime·sched.runqhead;
runtime·sched.runqhead = gp;
if(runtime·sched.runqtail == nil)
runtime·sched.runqtail = gp;
runtime·sched.runqsize++;
}
}
// 將最多 new * (256/2) 個(gè)任務(wù)轉(zhuǎn)移到 P 本地隊(duì)列。
for(i = 1; i < new * nelem(p->runq)/2 && runtime·sched.runqsize > 0; i++) {
gp = runtime·sched.runqhead;
runtime·sched.runqhead = gp->schedlink;
if(runtime·sched.runqhead == nil)
runtime·sched.runqtail = nil;
runtime·sched.runqsize--;
runqput(runtime·allp[i%new], gp);
}
// 如果 new < old,"釋放" 掉多余的 P 對(duì)象。
for(i = new; i < old; i++) {
p = runtime·allp[i];
runtime·freemcache(p->mcache);
p->mcache = nil;
gfpurge(p);
p->status = Pdead;
// can't free P itself because it can be referenced by an M in syscall
}
// 關(guān)聯(lián) P 到當(dāng)前 M。
p = runtime·allp[0];
acquirep(p);
// 將其他 P 放到空閑隊(duì)列。
for(i = new-1; i > 0; i--) {
p = runtime·allp[i];
p->status = Pidle;
pidleput(p);
}
runtime·atomicstore((uint32*)&runtime·gomaxprocs, new);
}
待運(yùn)行的 G 任務(wù)保存在 P 本地隊(duì)列和全局隊(duì)列中,因此增加或減少 P 數(shù)量都需要重新分布這些任務(wù)。還須確保先生成的 G 任務(wù)優(yōu)先放到隊(duì)列頭部,以優(yōu)先執(zhí)行。
在完成調(diào)度器初始化后,創(chuàng)建新 goroutine 運(yùn)行 main 函數(shù)。
proc.go
// The main goroutine.
func main() {
// 當(dāng)前 G。
g := getg()
// 確定最大棧內(nèi)存大小。
if ptrSize == 8 {
maxstacksize = 1000000000 // 1 GB
} else {
maxstacksize = 250000000 // 250 MB
}
// 使用單獨(dú)線程運(yùn)行 sysmon。
onM(newsysmon)
runtime_init()
main_init()
main_main()
// 終止進(jìn)程。
exit(0)
}
編譯器會(huì)將每條 go func 語句編譯成 newproc 函數(shù)調(diào)用,創(chuàng)建 G 對(duì)象。
反編譯一個(gè)簡(jiǎn)單的示例。
test.go
package main
import ()
func main() {
go println("Hello, World!")
}
(gdb) disass main.main
Dump of assembler code for function main.main:
0x000000000000202f <+47>:" lea rcx,[rip+0xff582] # 0x1015b8 <main.print.1.f>
0x0000000000002036 <+54>:" push rcx
0x0000000000002037 <+55>:" push 0x10
0x0000000000002039 <+57>:" call 0x2e880 <runtime.newproc>
先熟悉 G 里面幾個(gè)常見的字段成員。
runtime.h
struct Stack
{
uintptr lo; // 棧內(nèi)存開始地址。
uintptr hi; // 結(jié)束地址。
};
struct Gobuf
{
uintptr sp; // 對(duì)應(yīng) SP 寄存器。
uintptr pc; // IP/PC 寄存器。
void* ctxt;
uintreg ret;
uintptr lr; // ARM LR 寄存器。
};
struct G
{
Stack stack; // 自定義棧。
uintptr stackguard0; // 棧溢出檢查邊界。
Gobuf sched; // 執(zhí)行現(xiàn)場(chǎng)。
G* schedlink; // 鏈表。
};
跟蹤 newproc 的調(diào)用過程,最終目標(biāo)是 newproc1。
proc.c
G* runtime·newproc1(FuncVal *fn, byte *argp, int32 narg, int32 nret, void *callerpc)
{
siz = narg + nret;
siz = (siz+7) & ~7; // 8 字節(jié)對(duì)齊
// 當(dāng)前 P。
p = g->m->p;
// 獲取可復(fù)用的空閑 G 對(duì)象,或新建。
if((newg = gfget(p)) == nil) {
newg = runtime·malg(StackMin);
runtime·casgstatus(newg, Gidle, Gdead);
// 添加到 allg 全局變量。
runtime·allgadd(newg);
}
// 將參數(shù)和返回值入棧。
sp = (byte*)newg->stack.hi;
sp -= 4*sizeof(uintreg);
sp -= siz;
runtime·memmove(sp, argp, narg);
// thechar 5 代表 ARM,在 arch_xxx.h 中定義。
// 因?yàn)?ARM 需要額外保存 Caller's LR 寄存器值。
if(thechar == '5') {
// caller's LR
sp -= sizeof(void*);
*(void**)sp = nil;
}
// 在 sched 里保存執(zhí)行現(xiàn)場(chǎng)參數(shù)。
runtime·memclr((byte*)&newg->sched, sizeof newg->sched);
newg->sched.sp = (uintptr)sp;
newg->sched.pc = (uintptr)runtime·goexit + PCQuantum;
newg->sched.g = newg;
// 這個(gè)調(diào)用很關(guān)鍵,不過我們?cè)诤竺嬖斦f。
runtime·gostartcallfn(&newg->sched, fn);
newg->gopc = (uintptr)callerpc;
runtime·casgstatus(newg, Gdead, Grunnable);
// 將生成的 G 對(duì)象放到 P 本地隊(duì)列或全局隊(duì)列。
runqput(p, newg);
// 如果有空閑 P,且沒有處于自旋狀態(tài)的 M ...
if(runtime·atomicload(&runtime·sched.npidle) != 0 &&
runtime·atomicload(&runtime·sched.nmspinning) == 0 &&
fn->fn != runtime·main)
// 喚醒一個(gè)休眠的 M,或新建。
wakep();
return newg;
}
提取可復(fù)用 G 對(duì)象,將參數(shù)、返回值入棧,設(shè)置執(zhí)行現(xiàn)場(chǎng)的寄存器值。最后,放到待運(yùn)行隊(duì)列等待被 M 執(zhí)行。
P 使用 gfree 鏈表存儲(chǔ)可復(fù)用 G 對(duì)象,這很好理解。除本地復(fù)用鏈表外,還有一個(gè)全局復(fù)用鏈表。當(dāng)某 P 本地鏈表過長(zhǎng)時(shí),就轉(zhuǎn)移一部分到全局鏈表,以供其他 P 使用。
runtime.h
struct SchedT
{
// Global cache of dead G's. (任務(wù)結(jié)束,復(fù)用對(duì)象)
G* gfree;
int32 ngfree;
};
struct P
{
// Available G's (status == Gdead)
G* gfree;
int32 gfreecnt;
};
proc.c
static G* gfget(P *p)
{
G *gp;
void (*fn)(G*);
retry:
// 從 P 本地鏈表獲取一個(gè)可復(fù)用 G 對(duì)象。
gp = p->gfree;
// 如果為空,轉(zhuǎn)向全局鏈表。
if(gp == nil && runtime·sched.gfree) {
// 從全局鏈表提取一些復(fù)用對(duì)象到本地,直到填滿 32 個(gè)。
while(p->gfreecnt < 32 && runtime·sched.gfree != nil) {
p->gfreecnt++;
gp = runtime·sched.gfree;
runtime·sched.gfree = gp->schedlink;
runtime·sched.ngfree--;
gp->schedlink = p->gfree;
p->gfree = gp;
}
// 填充后再從本地鏈表獲取。
goto retry;
}
// 如果找到可復(fù)用 G 對(duì)象。
if(gp) {
// 調(diào)整本地鏈表。
p->gfree = gp->schedlink;
p->gfreecnt--;
// 檢查自定義棧。
if(gp->stack.lo == 0) {
// 重新分配棧內(nèi)存。
if(g == g->m->g0) {
gp->stack = runtime·stackalloc(FixedStack);
} else {
g->m->scalararg[0] = FixedStack;
g->m->ptrarg[0] = gp;
fn = mstackalloc;
runtime·mcall(&fn);
g->m->ptrarg[0] = nil;
}
// 設(shè)置棧頂。
gp->stackguard0 = gp->stack.lo + StackGuard;
}
}
return gp;
}
暫時(shí)不去理會(huì)自定義棧,后面有專門的章節(jié)說明這個(gè)問題。
沒有可復(fù)用對(duì)象時(shí),新建。
proc.c
G* runtime·malg(int32 stacksize)
{
G *newg;
void (*fn)(G*);
// 新建 G 對(duì)象。
newg = allocg();
// 分配自定義棧內(nèi)存。
if(stacksize >= 0) {
stacksize = runtime·round2(StackSystem + stacksize);
if(g == g->m->g0) {
newg->stack = runtime·stackalloc(stacksize);
} else {
g->m->scalararg[0] = stacksize;
g->m->ptrarg[0] = newg;
fn = mstackalloc;
runtime·mcall(&fn);
g->m->ptrarg[0] = nil;
}
newg->stackguard0 = newg->stack.lo + StackGuard;
newg->stackguard1 = ~(uintptr)0;
}
return newg;
}
static G* allocg(void)
{
return runtime·newG();
}
proc.go
func newG() *g {
return new(g)
}
新建 G 對(duì)象被添加到全局變量 allg。
proc.c
Slice runtime·allgs; // Go Slice。
G** runtime·allg; // 當(dāng)前所有 G 對(duì)象,包括完成任務(wù),等待復(fù)用的。
uintptr runtime·allglen; // 數(shù)量。
proc.go
func allgadd(gp *g) {
allgs = append(allgs, gp)
allg = &allgs[0]
allglen = uintptr(len(allgs))
}
所有參數(shù)設(shè)置好后,G 對(duì)象所代表的并發(fā)任務(wù)被放入待運(yùn)行隊(duì)列。
runtime.h
struct SchedT
{
// Global runnable queue. (待運(yùn)行任務(wù))
G* runqhead;
G* runqtail;
int32 runqsize;
};
struct P
{
// Queue of runnable goroutines. (用數(shù)組實(shí)現(xiàn)的環(huán)狀隊(duì)列)
uint32 runqhead;
uint32 runqtail;
G* runq[256];
};
proc.c
static void runqput(P *p, G *gp)
{
uint32 h, t;
retry:
// 很典型的數(shù)組環(huán)狀隊(duì)列實(shí)現(xiàn)。
// 累加 head、tail 位置計(jì)數(shù)器,然后取模獲取實(shí)際存儲(chǔ)索引。
h = runtime·atomicload(&p->runqhead);
t = p->runqtail;
if(t - h < nelem(p->runq)) {
p->runq[t%nelem(p->runq)] = gp;
runtime·atomicstore(&p->runqtail, t+1);
return;
}
// 如果本地隊(duì)列已滿,則放入全局待運(yùn)行隊(duì)列。
if(runqputslow(p, gp, h, t))
return;
goto retry;
}
static bool runqputslow(P *p, G *gp, uint32 h, uint32 t)
{
// 從本地隊(duì)列提取一半待運(yùn)行 G 任務(wù)。
n = t-h;
n = n/2;
for(i=0; i<n; i++)
batch[i] = p->runq[(h+i)%nelem(p->runq)];
// 調(diào)整本地隊(duì)列位置。
if(!runtime·cas(&p->runqhead, h, h+n))
return false;
// 添加當(dāng)前 G。
batch[n] = gp;
// 鏈表結(jié)構(gòu)。
for(i=0; i<n; i++)
batch[i]->schedlink = batch[i+1];
// 將這一批 G 放到全局隊(duì)列。
globrunqputbatch(batch[0], batch[n], n+1);
return true;
}
static void globrunqputbatch(G *ghead, G *gtail, int32 n)
{
// 直接將鏈表附加到全局鏈表尾部。
gtail->schedlink = nil;
if(runtime·sched.runqtail)
runtime·sched.runqtail->schedlink = ghead;
else
runtime·sched.runqhead = ghead;
runtime·sched.runqtail = gtail;
runtime·sched.runqsize += n;
}
兩個(gè)隊(duì)列采用了不同的設(shè)計(jì)。本地隊(duì)列長(zhǎng)度固定,用數(shù)組自然是效率最高。而全局隊(duì)列長(zhǎng)度未知,只能用鏈表實(shí)現(xiàn)。
調(diào)度器在很多地方都采用兩級(jí)隊(duì)列設(shè)計(jì),本地隊(duì)列是為了當(dāng)前線程無鎖獲取資源,而全局隊(duì)列則是為了在多個(gè) P/M 間進(jìn)行平衡。當(dāng) P 管理的對(duì)象數(shù)量過多時(shí)就會(huì)上交一部分到全局,反過來,就從全局提取一批到本地??傊?,最終目的是為了更好地復(fù)用內(nèi)存,更快地完成任務(wù)執(zhí)行。
不管語言層面如何抽象,所有 G 任務(wù)總歸要由線程執(zhí)行,每個(gè)系統(tǒng)線程對(duì)應(yīng)一個(gè) M。
runtime.h
struct M
{
G* g0; // 運(yùn)行時(shí)管理?xiàng)!? void (*mstartfn)(void); // 啟動(dòng)函數(shù),比如執(zhí)行 sysmon。
G* curg; // 當(dāng)前運(yùn)行的 G。
P* p; // 當(dāng)前關(guān)聯(lián)的 P。
P* nextp; // 臨時(shí)存放獲取的 P,用于后續(xù)任務(wù)。
Note park; // 休眠標(biāo)記。
M* alllink; // 全局 allm 鏈表。
};
先了解 M 的創(chuàng)建過程。
proc.c
static void newm(void(*fn)(void), P *p)
{
// 創(chuàng)建 M 對(duì)象。
mp = runtime·allocm(p);
// 設(shè)置待綁定 P 和啟動(dòng)函數(shù)。
mp->nextp = p;
mp->mstartfn = fn;
// 創(chuàng)建系統(tǒng)線程。
runtime·newosproc(mp, (byte*)mp->g0->stack.hi);
}
M* runtime·allocm(P *p)
{
mp = runtime·newM();
// 初始化。
mcommoninit(mp);
// 創(chuàng)建一個(gè) G,用于初始化 g0 棧。
if(runtime·iscgo || Solaris || Windows || Plan9)
mp->g0 = runtime·malg(-1);
else
mp->g0 = runtime·malg(8192);
mp->g0->m = mp;
return mp;
}
調(diào)度器會(huì)檢查 M 總數(shù),如超出限制會(huì)導(dǎo)致進(jìn)程崩潰。默認(rèn) 10000,多數(shù)時(shí)候無需關(guān)心,也可調(diào)用 debug/SetMaxThreads 修改。
proc.c
static void mcommoninit(M *mp)
{
// 增加計(jì)數(shù)器,設(shè)置 ID。
mp->id = runtime·sched.mcount++;
// 檢查系統(tǒng)當(dāng)前 M 總數(shù),如果超出限制,引發(fā)進(jìn)程崩潰。
checkmcount();
// 添加到全局鏈表。
mp->alllink = runtime·allm;
runtime·atomicstorep(&runtime·allm, mp);
}
static void checkmcount(void)
{
if(runtime·sched.mcount > runtime·sched.maxmcount){
runtime·printf("runtime: program exceeds %d-thread limit\n",
runtime·sched.maxmcount);
runtime·throw("thread exhaustion");
}
}
最關(guān)鍵的是 newosproc 創(chuàng)建系統(tǒng)線程。
os_linux.c
void runtime·newosproc(M *mp, void *stk)
{
flags = CLONE_VM /* share memory */
| CLONE_FS /* share cwd, etc */
| CLONE_FILES /* share fd table */
| CLONE_SIGHAND /* share sig handler table */
| CLONE_THREAD; /* revisit - okay for now */
ret = runtime·clone(flags, stk, mp, mp->g0, runtime·mstart);
}
os_darwin.c
void runtime·newosproc(M *mp, void *stk)
{
errno = runtime·bsdthread_create(stk, mp, mp->g0, runtime·mstart);
}
我們看到了線程函數(shù) mstart,這是后面要跟蹤的目標(biāo)。
M 有個(gè)很神秘的 g0 成員,它被傳遞給 newosproc 作為線程棧內(nèi)存,用來執(zhí)行運(yùn)行時(shí)管理指令,以避免在 G 用戶棧上切換上下文。
假如 M 線程直接使用 G 棧,那么就不能在執(zhí)行管理操作時(shí)將它放回隊(duì)列,也不能轉(zhuǎn)交給其他 M 執(zhí)行,那會(huì)導(dǎo)致多個(gè)線程共用棧內(nèi)存。同樣不能執(zhí)行用戶棧的擴(kuò)張或收縮操作。因此,在執(zhí)行管理指令前,必須將線程棧切換到 g0。
在前面章節(jié)中時(shí)常出現(xiàn)的 onM、mcall 就是用 g0 來執(zhí)行管理命令。
runtime.h
struct M
{
uintptr scalararg[4]; // scalar argument/return for mcall
void* ptrarg[4]; // pointer argument/return for mcall
};
asm_amd64.s
TEXT runtime·mcall(SB), NOSPLIT, $0-8
MOVQ fn+0(FP), DI // DI 保存要運(yùn)行的管理函數(shù)指針。
// 保存當(dāng)前 G 執(zhí)行現(xiàn)場(chǎng)。
get_tls(CX)
MOVQ g(CX), AX // save state in g->sched
MOVQ 0(SP), BX // caller's PC
MOVQ BX, (g_sched+gobuf_pc)(AX)
LEAQ fn+0(FP), BX // caller's SP
MOVQ BX, (g_sched+gobuf_sp)(AX)
MOVQ AX, (g_sched+gobuf_g)(AX)
// 切換到 g0 棧,執(zhí)行管理函數(shù)。
MOVQ g(CX), BX // g
MOVQ g_m(BX), BX // g.m
MOVQ m_g0(BX), SI // m.g0
MOVQ SI, g(CX) // g = m->g0
MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.sp
PUSHQ AX
MOVQ DI, DX
MOVQ 0(DI), DI
CALL DI // fn arg
RET
在創(chuàng)建 G 時(shí),調(diào)度器會(huì)調(diào)用 wakep 喚醒 M 執(zhí)行任務(wù)。
proc.c
static void wakep(void)
{
startm(nil, true);
}
static void startm(P *p, bool spinning)
{
M *mp;
void (*fn)(void);
// 獲取空閑 P。如果沒有,直接返回。
if(p == nil) {
p = pidleget();
if(p == nil) {
return;
}
}
// 獲取空閑 M, 或新建。
mp = mget();
if(mp == nil) {
fn = nil;
newm(fn, p);
return;
}
// 臨時(shí)保存待用 P。
mp->nextp = p;
// 喚醒。
runtime·notewakeup(&mp->park);
}
static M* mget(void)
{
// 從空閑列表獲取 M。
if((mp = runtime·sched.midle) != nil){
runtime·sched.midle = mp->schedlink;
runtime·sched.nmidle--;
}
return mp;
}
當(dāng) M 線程找不到后續(xù)待運(yùn)行 G 任務(wù),或因某種原因被剝奪關(guān)聯(lián) P 時(shí),會(huì)休眠線程,并被保存到 sched.midle 空閑鏈表中,直到被重新獲取、喚醒。
proc.c
static void stopm(void)
{
...
retry:
// 添加到空閑鏈表。
mput(g->m);
// 休眠線程,直到被喚醒后繼續(xù)執(zhí)行。
runtime·notesleep(&g->m->park);
// 被喚醒后,清除休眠標(biāo)志。
runtime·noteclear(&g->m->park);
// 處理 GC 任務(wù) (這個(gè)因?yàn)?StopTheWorld,并不需要 P)。
if(g->m->helpgc) {
runtime·gchelper();
g->m->helpgc = 0;
g->m->mcache = nil;
goto retry;
}
// 既然被喚醒,必然獲取了可用 P,關(guān)聯(lián)。
acquirep(g->m->nextp);
g->m->nextp = nil;
}
static void mput(M *mp)
{
// 添加到空閑鏈表。
mp->schedlink = runtime·sched.midle;
runtime·sched.midle = mp;
runtime·sched.nmidle++;
}
static void acquirep(P *p)
{
g->m->mcache = p->mcache;
g->m->p = p;
p->m = g->m;
p->status = Prunning;
}
休眠操作通過 futex 實(shí)現(xiàn),這是一種快速用戶區(qū)互斥實(shí)現(xiàn)。該鎖定在用戶空間用原子指令完成,只在結(jié)果不一致時(shí)才進(jìn)入系統(tǒng)內(nèi)核,有非常高的執(zhí)行效率。
lock_futex.go
func notesleep(n *note) {
for atomicload(key32(&n.key)) == 0 {
futexsleep(key32(&n.key), 0, -1) // 休眠直到被喚醒 (timeout = -1)。
} // 喚醒后,n.key = 1,終止循環(huán)。
}
os_linux.c
void runtime·futexsleep(uint32 *addr, uint32 val, int64 ns)
{
Timespec ts;
// 不超時(shí)。
if(ns < 0) {
runtime·futex(addr, FUTEX_WAIT, val, nil, nil, 0);
return;
}
ts.tv_nsec = 0;
ts.tv_sec = runtime·timediv(ns, 1000000000LL, (int32*)&ts.tv_nsec);
runtime·futex(addr, FUTEX_WAIT, val, &ts, nil, 0);
}
喚醒操作會(huì)修改標(biāo)記值,成功后調(diào)用 noteclear 重置狀態(tài)。
lock_futex.go
func notewakeup(n *note) {
old := xchg(key32(&n.key), 1)
futexwakeup(key32(&n.key), 1)
}
func noteclear(n *note) {
n.key = 0
}
os_linux.c
void runtime·futexwakeup(uint32 *addr, uint32 cnt)
{
ret = runtime·futex(addr, FUTEX_WAKE, cnt, nil, nil, 0);
}
線程函數(shù) mstart 讓 M 進(jìn)入調(diào)度器核心循環(huán),它不停從 P 本地隊(duì)列、全局隊(duì)列查找并執(zhí)行待運(yùn)行 G 任務(wù)。期間,會(huì)處理一下垃圾回收等額外操作,完成后繼續(xù)回來執(zhí)行任務(wù)。
proc.c
static void mstart(void)
{
// 執(zhí)行啟動(dòng)函數(shù)。
if(g->m->mstartfn)
g->m->mstartfn();
if(g->m->helpgc) {
// 如果正在垃圾回收,休眠線程。
g->m->helpgc = 0;
stopm();
} else if(g->m != &runtime·m0) {
// 關(guān)聯(lián) P。
acquirep(g->m->nextp);
g->m->nextp = nil;
}
// 執(zhí)行調(diào)度函數(shù)。
schedule();
}
核心循環(huán)過程: schedule -> execute -> G.func -> goexit 。
proc.c
static void schedule(void)
{
gp = nil;
// 當(dāng)前 P 任務(wù)執(zhí)行次數(shù)計(jì)數(shù)器。
tick = g->m->p->schedtick;
// 每隔 61 次,就從全局隊(duì)列提取一個(gè)任務(wù),以確保公平。
// This is a fancy way to say tick%61==0,
if(tick - (((uint64)tick*0x4325c53fu)>>36)*61 == 0 && runtime·sched.runqsize > 0) {
gp = globrunqget(g->m->p, 1); // 僅返回一個(gè) G,不轉(zhuǎn)移。
}
// 從本地隊(duì)列提取任務(wù)。
if(gp == nil) {
gp = runqget(g->m->p);
}
// 從其他地方查找任務(wù)。
if(gp == nil) {
gp = findrunnable(); // blocks until work is available
}
// 執(zhí)行任務(wù)。
execute(gp);
}
全局隊(duì)列存儲(chǔ)了超出 P 本地?cái)?shù)量限制的待運(yùn)行任務(wù),是所有 P/M 的后備資源。
proc.c
static G* globrunqget(P *p, int32 max)
{
G *gp, *gp1;
int32 n;
if(runtime·sched.runqsize == 0)
return nil;
// 確定要轉(zhuǎn)移的任務(wù)數(shù)。
n = runtime·sched.runqsize/runtime·gomaxprocs+1;
if(n > runtime·sched.runqsize)
n = runtime·sched.runqsize;
if(max > 0 && n > max)
n = max;
if(n > nelem(p->runq)/2)
n = nelem(p->runq)/2;
runtime·sched.runqsize -= n;
if(runtime·sched.runqsize == 0)
runtime·sched.runqtail = nil;
// 將第一個(gè)任務(wù)返回。
gp = runtime·sched.runqhead;
runtime·sched.runqhead = gp->schedlink;
n--;
// 轉(zhuǎn)移一批任務(wù)到本地隊(duì)列。
while(n--) {
gp1 = runtime·sched.runqhead;
runtime·sched.runqhead = gp1->schedlink;
runqput(p, gp1);
}
return gp;
}
本地隊(duì)列優(yōu)先為線程提供無鎖任務(wù)獲取。
proc.c
static G* runqget(P *p)
{
G *gp;
uint32 t, h;
// 從數(shù)組循環(huán)隊(duì)列返回任務(wù)。
for(;;) {
h = runtime·atomicload(&p->runqhead);
t = p->runqtail;
if(t == h)
return nil;
gp = p->runq[h%nelem(p->runq)];
if(runtime·cas(&p->runqhead, h, h+1))
return gp;
}
}
如果本地和全局隊(duì)列中都沒找到可用任務(wù),調(diào)度器就會(huì)費(fèi)盡心思檢查各個(gè)角落。包括網(wǎng)絡(luò)任務(wù),甚至是從其他 P 隊(duì)列中偷一些過來。
proc.c
static G* findrunnable(void)
{
top:
// 本地隊(duì)列。
gp = runqget(g->m->p);
if(gp)
return gp;
// 全局隊(duì)列。
if(runtime·sched.runqsize) {
gp = globrunqget(g->m->p, 0); // 轉(zhuǎn)移一批到本地。
if(gp)
return gp;
}
// 網(wǎng)絡(luò)任務(wù)。
gp = runtime·netpoll(false); // non-blocking
if(gp) {
injectglist(gp->schedlink); // 插入全局隊(duì)列。
runtime·casgstatus(gp, Gwaiting, Grunnable);
return gp;
}
// 從其他 P 偷一些任務(wù)。
for(i = 0; i < 2*runtime·gomaxprocs; i++) {
p = runtime·allp[runtime·fastrand1()%runtime·gomaxprocs]; // 隨機(jī)選擇。
if(p == g->m->p) // 當(dāng)前 P。
gp = runqget(p);
else // 其他 P。
gp = runqsteal(g->m->p, p);
if(gp)
return gp;
}
stop:
// 再次檢查全局隊(duì)列。
if(runtime·sched.runqsize) {
gp = globrunqget(g->m->p, 0);
return gp;
}
// 解除 P 綁定,返回空閑列表。
p = releasep();
pidleput(p);
// 循環(huán)檢查其他 P 任務(wù)列表,如果有未完成任務(wù),那么跳轉(zhuǎn)到 top 重試,以便偷一些過來。
for(i = 0; i < runtime·gomaxprocs; i++) {
p = runtime·allp[i];
if(p && p->runqhead != p->runqtail) {
// 重新關(guān)聯(lián) P。
p = pidleget();
if(p) {
acquirep(p);
goto top;
}
break;
}
}
// 再次檢查網(wǎng)絡(luò)任務(wù)。
if(runtime·xchg64(&runtime·sched.lastpoll, 0) != 0) {
gp = runtime·netpoll(true); // block until new work is available
runtime·atomicstore64(&runtime·sched.lastpoll, runtime·nanotime());
if(gp) {
p = pidleget();
if(p) {
// 重新關(guān)聯(lián) P。
acquirep(p);
// 將其他任務(wù)添加到全局隊(duì)列。
injectglist(gp->schedlink);
runtime·casgstatus(gp, Gwaiting, Grunnable);
return gp;
}
// 如果沒有可用 P,添加到全局隊(duì)列。
injectglist(gp);
}
}
// 如果什么任務(wù)都沒拿到,休眠當(dāng)前線程。
stopm();
goto top;
}
這種偷竊行為就是官方文檔所提及的 workstealing算法。
proc.c
static G* runqsteal(P *p, P *p2)
{
G *gp;
G *batch[nelem(p->runq)/2];
// 將 P2 一半任務(wù)轉(zhuǎn)移到 batch。
n = runqgrab(p2, batch);
if(n == 0)
return nil;
// 返回一個(gè)任務(wù)。
n--;
gp = batch[n];
if(n == 0)
return gp;
// 將剩余任務(wù)添加到 P 隊(duì)列。
h = runtime·atomicload(&p->runqhead);
t = p->runqtail;
for(i=0; i<n; i++, t++)
p->runq[t%nelem(p->runq)] = batch[i];
runtime·atomicstore(&p->runqtail, t);
return gp;
}
等拿到 G 任務(wù),接下來就交由 execute 負(fù)責(zé)執(zhí)行。
proc.c
static void execute(G *gp)
{
// 修改狀態(tài)。
runtime·casgstatus(gp, Grunnable, Grunning);
gp->waitsince = 0;
gp->preempt = false;
gp->stackguard0 = gp->stack.lo + StackGuard;
g->m->p->schedtick++; // 執(zhí)行計(jì)數(shù)器。
g->m->curg = gp;
gp->m = g->m;
runtime·gogo(&gp->sched);
}
asm_amd64.s
TEXT runtime·gogo(SB), NOSPLIT, $0-8
MOVQ buf+0(FP), BX // gobuf
MOVQ gobuf_g(BX), DX
MOVQ 0(DX), CX // make sure g != nil
get_tls(CX)
MOVQ DX, g(CX)
MOVQ gobuf_sp(BX), SP // 從 G.sched 恢復(fù)執(zhí)行現(xiàn)場(chǎng)。
MOVQ gobuf_ret(BX), AX
MOVQ gobuf_ctxt(BX), DX
MOVQ gobuf_pc(BX), BX // G.sched.pc 指向 goroutine 任務(wù)函數(shù)。
JMP BX // 執(zhí)行該函數(shù)。
匯編函數(shù)從 sched 恢復(fù)現(xiàn)場(chǎng),將寄存器指向 G 用戶棧,然后跳轉(zhuǎn)到任務(wù)函數(shù)開始執(zhí)行。
這里有個(gè)問題,匯編函數(shù)并沒有保存 execute 返回現(xiàn)場(chǎng),也就是說等任務(wù)函數(shù)結(jié)束后,執(zhí)行緒不會(huì)回到 execute。那 goexit 如何執(zhí)行?如何繼續(xù)循環(huán)調(diào)用?
要解釋清這個(gè)問題,需要埋一個(gè)在創(chuàng)建任務(wù)時(shí)留下的坑:gostartcallfn。
proc.c
G* runtime·newproc1(FuncVal *fn, byte *argp, int32 narg, int32 nret, void *callerpc)
{
...
newg->sched.sp = (uintptr)sp;
newg->sched.pc = (uintptr)runtime·goexit + PCQuantum;
newg->sched.g = newg;
runtime·gostartcallfn(&newg->sched, fn);
...
return newg;
}
stack.c
void runtime·gostartcallfn(Gobuf *gobuf, FuncVal *fv)
{
runtime·gostartcall(gobuf, fn, fv);
}
sys_x86.c
void runtime·gostartcall(Gobuf *gobuf, void (*fn)(void), void *ctxt)
{
sp = (uintptr*)gobuf->sp;
// 將 pc,也就是 goexit 地址入棧。
*--sp = (uintptr)gobuf->pc;
gobuf->sp = (uintptr)sp;
// 將 pc 指向 fn。
gobuf->pc = (uintptr)fn;
gobuf->ctxt = ctxt;
}
很有意思,在 gostartcall 中,提前將 goexit 地址壓入 G 棧。
匯編函數(shù) gogo 是 long jmp,也就是說當(dāng)任務(wù)函數(shù)執(zhí)行結(jié)束時(shí),其尾部 RET 指令從棧上彈給 PC 寄存器的是 goexit 地址,這就是秘密所在。
asm_amd64.s
TEXT runtime·goexit(SB),NOSPLIT,$0-0
BYTE $0x90 // NOP
CALL runtime·goexit1(SB) // does not return
proc.c
void runtime·goexit1(void)
{
fn = goexit0;
runtime·mcall(&fn);
}
static void goexit0(G *gp)
{
runtime·casgstatus(gp, Grunning, Gdead);
// 將 G 放回 P.gfree 復(fù)用鏈表。
gfput(g->m->p, gp);
schedule();
}
任務(wù)結(jié)束,當(dāng)前 G 對(duì)象被放回復(fù)用鏈表,而線程則繼續(xù) schedule 循環(huán)往復(fù)。
proc.c
static void gfput(P *p, G *gp)
{
stksize = gp->stack.hi - gp->stack.lo;
// 如果不是默認(rèn)棧,釋放。
if(stksize != FixedStack) {
runtime·stackfree(gp->stack);
gp->stack.lo = 0;
gp->stack.hi = 0;
gp->stackguard0 = 0;
}
// 添加到復(fù)用鏈表。
gp->schedlink = p->gfree;
p->gfree = gp;
p->gfreecnt++;
// 如果 P 復(fù)用鏈表過長(zhǎng) ...
if(p->gfreecnt >= 64) {
// 將超出的復(fù)用對(duì)象轉(zhuǎn)移到全局鏈表。
while(p->gfreecnt >= 32) {
p->gfreecnt--;
gp = p->gfree;
p->gfree = gp->schedlink;
gp->schedlink = runtime·sched.gfree;
runtime·sched.gfree = gp;
runtime·sched.ngfree++;
}
}
}
另外,在 schedule 里還提到 lockedg,這表示該 G 任務(wù)只能交由指定 M 執(zhí)行,且該 M在綁定解除前會(huì)被休眠,不再執(zhí)行其他任務(wù)。當(dāng) M 遇到 lockedg,需要將 P 和 G 都交給綁定 M 去執(zhí)行。
proc.c
static void schedule(void)
{
// 如果當(dāng)前 M 被綁定給某個(gè) G,那么交出 P,休眠。
// 直到被某個(gè)拿到 locked G 的 M 喚醒。
if(g->m->lockedg) {
stoplockedm();
execute(g->m->lockedg); // Never returns.
}
...
// 如果當(dāng)前 G 被綁定到某個(gè) M,那么將 P 和 G 都交給對(duì)方。
// 喚醒綁定 M,自己回空閑隊(duì)列。
if(gp->lockedm) {
startlockedm(gp);
goto top; // 喚醒后回到頭部重新獲取任務(wù)。
}
}
static void startlockedm(G *gp)
{
// 獲取 G 綁定的 M。
mp = gp->lockedm;
// 將當(dāng)前 P 交給對(duì)方,并喚醒。
p = releasep();
mp->nextp = p;
runtime·notewakeup(&mp->park);
// 休眠當(dāng)前 M。
stopm();
}
static void stoplockedm(void)
{
// 上交 P,喚醒其他 M 執(zhí)行任務(wù)。
if(g->m->p) {
p = releasep();
handoffp(p);
}
// 休眠,直到被拿到 locked G 的 M 喚醒。
runtime·notesleep(&g->m->park);
runtime·noteclear(&g->m->park);
// 綁定對(duì)方交過來的 P。
acquirep(g->m->nextp);
g->m->nextp = nil;
}
static void goexit0(G *gp)
{
// 解除綁定。
gp->lockedm = nil;
g->m->lockedg = nil;
schedule();
}
在 cgo 里就用 lockOSThread 鎖定線程。
proc.c
static void lockOSThread(void)
{
g->m->lockedg = g;
g->lockedm = g->m;
}
static void unlockOSThread(void)
{
g->m->lockedg = nil;
g->lockedm = nil;
}
cgocall.go
func cgocall(fn, arg unsafe.Pointer) {
cgocall_errno(fn, arg)
}
func cgocall_errno(fn, arg unsafe.Pointer) int32 {
/*
* Lock g to m to ensure we stay on the same stack if we do a
* cgo callback. Add entry to defer stack in case of panic.
*/
lockOSThread()
mp := getg().m
mp.ncgocall++
mp.ncgo++
defer endcgo(mp)
/*
* Announce we are entering a system call
* so that the scheduler knows to create another
* M to run goroutines while we are in the
* foreign code.
*/
entersyscall()
errno := asmcgocall_errno(fn, arg) // 切換到 g0 stack 執(zhí)行。
exitsyscall()
return errno
}
func endcgo(mp *m) {
unlockOSThread()
}
調(diào)度器提供了兩種方式暫時(shí)中斷 G 任務(wù)執(zhí)行。
proc.go
func Gosched() {
mcall(gosched_m) // mcall 保存執(zhí)行現(xiàn)場(chǎng)到 G.sched,然后切換到 g0 棧。
}
proc.c
void runtime·gosched_m(G *gp)
{
// 將狀態(tài)從正在運(yùn)行調(diào)整為可運(yùn)行。
runtime·casgstatus(gp, Grunning, Grunnable);
// 將 G 重新放回全局隊(duì)列。
globrunqput(gp);
// 當(dāng)前 M 繼續(xù)查找并執(zhí)行其他任務(wù)。
schedule();
}
與 gosched 不同,gopark 并不會(huì)將 G 放回待運(yùn)行隊(duì)列。
proc.go
func gopark(unlockf unsafe.Pointer, lock unsafe.Pointer, reason string) {
mcall(park_m)
}
proc.c
void runtime·park_m(G *gp)
{
// 修改狀態(tài)為等待。
runtime·casgstatus(gp, Grunning, Gwaiting);
// 當(dāng)前 M 繼續(xù)獲取并執(zhí)行其他任務(wù)。
schedule();
}
直到顯式調(diào)用 goready 將該 G 重新放回隊(duì)列。
proc.go
func goready(gp *g) {
onM(ready_m)
}
proc.c
void runtime·ready_m(void)
{
runtime·ready(gp);
}
void runtime·ready(G *gp)
{
// 修改狀態(tài)為可運(yùn)行。
runtime·casgstatus(gp, Gwaiting, Grunnable);
// 將 G 重新放回本地待運(yùn)行隊(duì)列。
runqput(g->m->p, gp);
// 喚醒某個(gè) M 執(zhí)行任務(wù)。
if(runtime·atomicload(&runtime·sched.npidle) != 0 &&
runtime·atomicload(&runtime·sched.nmspinning) == 0)
wakep();
}
連續(xù)棧的地位被正式確定下來,Go 1.4 已經(jīng)移除了分段棧代碼。
相比較分段棧,連續(xù)棧結(jié)構(gòu)更簡(jiǎn)單,實(shí)現(xiàn)算法也更加簡(jiǎn)潔。除棧開始、結(jié)束地址外,只需維護(hù)一個(gè)用于溢出檢查的指針即可。
runtime.h
struct Stack
{
uintptr lo;
uintptr hi;
};
struct G
{
// stack describes the actual stack memory: [stack.lo, stack.hi).
// stackguard0 is the stack pointer compared in the Go stack growth prologue.
// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a
// preemption.
Stack stack;
uintptr stackguard0;
}
結(jié)構(gòu)示意圖:
在 stack.h 頭部棧結(jié)構(gòu)布局說明中有對(duì) StackGuard 的詳細(xì)解釋。
stack.h
/*
The per-goroutine g->stackguard is set to point StackGuard bytes
above the bottom of the stack. Each function compares its stack
pointer against g->stackguard to check for overflow. To cut one
instruction from the check sequence for functions with tiny frames,
the stack is allowed to protrude StackSmall bytes below the stack
guard. Functions with large frames don't bother with the check and
always call morestack.
*/
StackGuard = 512 + StackSystem
在經(jīng)過幾個(gè)版本的反復(fù)調(diào)整后,棧默認(rèn)大小又回到 2048 字節(jié),這是個(gè)利好消息。
stack.h
StackMin = 2048,
proc.c
G* runtime·newproc1(FuncVal *fn, byte *argp, int32 narg, int32 nret, void *callerpc)
{
if((newg = gfget(p)) == nil) {
newg = runtime·malg(StackMin);
}
}
和內(nèi)存分配器的的做法類似,調(diào)度器會(huì)在 cache 上緩存 stack 對(duì)象。
malloc.h
// Number of orders that get caching. Order 0 is FixedStack
// and each successive order is twice as large.
NumStackOrders = 3,
runtime.h
struct StackFreeList
{
MLink *list; // linked list of free stacks
uintptr size; // total size of stacks in list
};
struct MCache
{
StackFreeList stackcache[NumStackOrders];
};
被緩存的 stack 依據(jù)大小分成 3 種 order,這與 FixedStack 值有很大關(guān)系。
stack.h
#ifdef GOOS_windows
StackSystem = 512 * sizeof(uintptr),
#else
#ifdef GOOS_plan9
StackSystem = 512,
#else
StackSystem = 0,
#endif // Plan 9
#endif // Windows
// The minimum stack size to allocate.
// The hackery here rounds FixedStack0 up to a power of 2.
FixedStack0 = StackMin + StackSystem,
FixedStack1 = FixedStack0 - 1,
FixedStack2 = FixedStack1 | (FixedStack1 >> 1),
FixedStack3 = FixedStack2 | (FixedStack2 >> 2),
FixedStack4 = FixedStack3 | (FixedStack3 >> 4),
FixedStack5 = FixedStack4 | (FixedStack4 >> 8),
FixedStack6 = FixedStack5 | (FixedStack5 >> 16),
FixedStack = FixedStack6 + 1,
對(duì) Linux、darwin 等系統(tǒng)而言,F(xiàn)ixedStack 值和 StackMin 相同。 因此被緩存 stack大小分別是:
FiexedStack = 2048
FixedStack << order 0 = 2048
FixedStack << order 1 = 4096
FixedStack << order 2 = 8192
在確定相關(guān)結(jié)構(gòu)和參數(shù)后,看看 stack 具體的分配過程。
malloc.h
StackCacheSize = 32*1024, // Per-P, per order stack segment cache size.
stack.c
Stack runtime·stackalloc(uint32 n)
{
// 從復(fù)用鏈表分配,或直接從 heap.span 分配。
if(StackCache && n < FixedStack << NumStackOrders && n < StackCacheSize) {
// 計(jì)算對(duì)應(yīng)的 stack order。
order = 0;
n2 = n;
while(n2 > FixedStack) {
order++;
n2 >>= 1;
}
c = g->m->mcache;
if(c == nil || g->m->gcing || g->m->helpgc) {
// 從全局緩存分配。
x = poolalloc(order);
} else {
// 從本地 cache 分配。
x = c->stackcache[order].list;
// 如果本地沒有復(fù)用 stack,則從全局緩存轉(zhuǎn)移一批過來。
if(x == nil) {
stackcacherefill(c, order);
x = c->stackcache[order].list;
}
// 調(diào)整鏈表。
c->stackcache[order].list = x->next;
c->stackcache[order].size -= n;
}
v = (byte*)x;
} else {
// 直接從 heap.spans 分配。
s = runtime·MHeap_AllocStack(&runtime·mheap, ROUND(n, PageSize) >> PageShift);
v = (byte*)(s->start<<PageShift);
}
return (Stack){(uintptr)v, (uintptr)v+n};
}
整個(gè)過程和從 cache 分配 object 如出一轍。而在釋放時(shí),調(diào)度器會(huì)主動(dòng)將過多的復(fù)用對(duì)象從本地轉(zhuǎn)移到全局緩存。
stack.c
void runtime·stackfree(Stack stk)
{
n = stk.hi - stk.lo;
v = (void*)stk.lo;
if(StackCache && n < FixedStack << NumStackOrders && n < StackCacheSize) {
// 計(jì)算 order。
order = 0;
n2 = n;
while(n2 > FixedStack) {
order++;
n2 >>= 1;
}
x = (MLink*)v;
c = g->m->mcache;
if(c == nil || g->m->gcing || g->m->helpgc) {
// 歸還給全局緩存。
poolfree(x, order);
} else {
// 如果本地緩存超出容量限制,則歸還一批給全局緩存。
if(c->stackcache[order].size >= StackCacheSize)
stackcacherelease(c, order);
// 添加到 cache 本地鏈表。
x->next = c->stackcache[order].list;
c->stackcache[order].list = x;
c->stackcache[order].size += n;
}
} else {
// 歸還給 heap。
s = runtime·MHeap_Lookup(&runtime·mheap, v);
runtime·MHeap_FreeStack(&runtime·mheap, s);
}
}
全局緩存池基本上就是對(duì) span 鏈表的操作,類似做法在內(nèi)存分配器章節(jié)早已見過。
stack.c
MSpan runtime·stackpool[NumStackOrders]; // 全局緩存。
void runtime·stackinit(void)
{
for(i = 0; i < NumStackOrders; i++)
runtime·MSpanList_Init(&runtime·stackpool[i]);
}
static MLink* poolalloc(uint8 order)
{
MSpan *list;
MSpan *s;
MLink *x;
list = &runtime·stackpool[order];
s = list->next;
if(s == list) {
// 如果沒有 stack 可用,則從 heap 獲取一個(gè) span。
s = runtime·MHeap_AllocStack(&runtime·mheap, StackCacheSize >> PageShift);
// 切分。
for(i = 0; i < StackCacheSize; i += FixedStack << order) {
x = (MLink*)((s->start << PageShift) + i);
x->next = s->freelist;
s->freelist = x;
}
// 插入鏈表。
runtime·MSpanList_Insert(list, s);
}
x = s->freelist;
s->freelist = x->next;
s->ref++;
if(s->freelist == nil) {
// all stacks in s are allocated.
runtime·MSpanList_Remove(s);
}
return x;
}
static void poolfree(MLink *x, uint8 order)
{
MSpan *s;
s = runtime·MHeap_Lookup(&runtime·mheap, x);
if(s->freelist == nil) {
// 有對(duì)象歸還,自然要重新放回復(fù)用鏈表中。
runtime·MSpanList_Insert(&runtime·stackpool[order], s);
}
x->next = s->freelist;
s->freelist = x;
s->ref--;
// 如果該 span 內(nèi)存被全部收回,還給 heap。
if(s->ref == 0) {
runtime·MSpanList_Remove(s);
s->freelist = nil;
runtime·MHeap_FreeStack(&runtime·mheap, s);
}
}
相關(guān)的批量轉(zhuǎn)移和歸還操作也沒什么值得深究的。
stack.c
static void stackcacherefill(MCache *c, uint8 order)
{
MLink *x, *list;
uintptr size;
// Grab some stacks from the global cache.
// Grab half of the allowed capacity (to prevent thrashing).
list = nil;
size = 0;
while(size < StackCacheSize/2) {
x = poolalloc(order);
x->next = list;
list = x;
size += FixedStack << order;
}
c->stackcache[order].list = list;
c->stackcache[order].size = size;
}
static void stackcacherelease(MCache *c, uint8 order)
{
MLink *x, *y;
uintptr size;
x = c->stackcache[order].list;
size = c->stackcache[order].size;
while(size > StackCacheSize/2) {
y = x->next;
poolfree(x, order);
x = y;
size -= FixedStack << order;
}
c->stackcache[order].list = x;
c->stackcache[order].size = size;
}
連續(xù)棧的調(diào)整行為是由編譯器偷偷完成的。反匯編可執(zhí)行文件,你會(huì)看到編譯器會(huì)在函數(shù)頭部插入 morestack 調(diào)用,這是運(yùn)行時(shí)檢查棧內(nèi)存的關(guān)鍵。
(gdb) disass
Dump of assembler code for function main.main:
0x000000000000207f <+15>:" call 0x2ddf0 <runtime.morestack_noctxt>
asm_amd64.s
TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0
MOVL $0, DX
JMP runtime·morestack(SB)
TEXT runtime·morestack(SB),NOSPLIT,$0-0
MOVQ (g_sched+gobuf_sp)(BP), SP
CALL runtime·newstack(SB)
當(dāng)需要調(diào)整棧大小時(shí),會(huì)調(diào)用 newstack 完成連續(xù)棧的重新分配。
stack.c
// Called from runtime·morestack when more stack is needed.
// Allocate larger stack and relocate to new stack.
// Stack growth is multiplicative, for constant amortized cost.
void runtime·newstack(void)
{
gp = g->m->curg;
// 修改狀態(tài)。
runtime·casgstatus(gp, Grunning, Gwaiting);
gp->waitreason = runtime·gostringnocopy((byte*)"stack growth");
// 調(diào)整執(zhí)行現(xiàn)場(chǎng)參數(shù)。
runtime·rewindmorestack(&gp->sched);
// 新棧需要 2 倍空間。
oldsize = gp->stack.hi - gp->stack.lo;
newsize = oldsize * 2;
// 分配新 stack,并將數(shù)據(jù)拷貝到新站。
copystack(gp, newsize);
// 恢復(fù)狀態(tài),繼續(xù)執(zhí)行。
runtime·casgstatus(gp, Gwaiting, Grunning);
runtime·gogo(&gp->sched);
}
與分段棧相比,連續(xù)棧核心算法 copystack 非常簡(jiǎn)單易懂。
stack.c
static void copystack(G *gp, uintptr newsize)
{
old = gp->stack;
used = old.hi - gp->sched.sp;
// 創(chuàng)建新棧。
new = runtime·stackalloc(newsize);
// ... 一些棧內(nèi)容調(diào)整操作 ...
// 拷貝棧數(shù)據(jù)。
runtime·memmove((byte*)new.hi - used, (byte*)old.hi - used, used);
// 切換到新棧。
gp->stack = new;
gp->stackguard0 = new.lo + StackGuard;
gp->sched.sp = new.hi - used;
// 釋放舊棧。
if(newsize > old.hi-old.lo) {
// 擴(kuò)張, 立即釋放。
runtime·stackfree(old);
} else {
// 收縮操作有點(diǎn)復(fù)雜,因?yàn)樵瓧I系哪承?shù)據(jù)可能對(duì)垃圾回收器有用。
// 放到一個(gè)臨時(shí)隊(duì)列,等待垃圾回收器處理。
*(Stack*)old.lo = stackfreequeue;
stackfreequeue = old;
}
}
垃圾回收器會(huì)清理所有緩存,釋放掉臨時(shí)存儲(chǔ)的 stack,并收縮棧內(nèi)存。
mgc0.c
static void gc(struct gc_args *args)
{
runtime·shrinkfinish();
}
static void markroot(ParFor *desc, uint32 i)
{
switch(i) {
case RootData:
...
case RootFlushCaches:
flushallmcaches(); // 清理 cache。
break;
default:
gp = runtime·allg[i - RootCount];
runtime·shrinkstack(gp); // 收縮棧內(nèi)存。
break;
}
}
static void flushallmcaches(void)
{
// Flush MCache's to MCentral.
for(pp=runtime·allp; p=*pp; pp++) {
c = p->mcache;
runtime·MCache_ReleaseAll(c);
runtime·stackcache_clear(c); // 釋放 cache 里緩存的 stack。
}
}
stack.c
static Stack stackfreequeue;
// 清理臨時(shí) stack。
void runtime·shrinkfinish(void)
{
s = stackfreequeue;
stackfreequeue = (Stack){0,0};
while(s.lo != 0) {
t = *(Stack*)s.lo;
runtime·stackfree(s);
s = t;
}
}
// 收縮棧內(nèi)存。
void runtime·shrinkstack(G *gp)
{
oldsize = gp->stack.hi - gp->stack.lo;
newsize = oldsize / 2;
if(newsize < FixedStack)
return; // don't shrink below the minimum-sized stack
used = gp->stack.hi - gp->sched.sp;
if(used >= oldsize / 4)
return; // still using at least 1/4 of the segment.
copystack(gp, newsize);
}
// 釋放所有 cache 持有的 stack 緩存。
void runtime·stackcache_clear(MCache *c)
{
for(order = 0; order < NumStackOrders; order++) {
x = c->stackcache[order].list;
while(x != nil) {
y = x->next;
poolfree(x, order);
x = y;
}
c->stackcache[order].list = nil;
c->stackcache[order].size = 0;
}
}
調(diào)度器完成 G 任務(wù)后,會(huì)將其放回復(fù)用列表,并釋放掉額外分配的棧內(nèi)存。
proc.c
static void gfput(P *p, G *gp)
{
stksize = gp->stack.hi - gp->stack.lo;
// 如果不是默認(rèn)棧,釋放。
if(stksize != FixedStack) {
runtime·stackfree(gp->stack);
gp->stack.lo = 0;
gp->stack.hi = 0;
gp->stackguard0 = 0;
}
}
還有,在減少 P 數(shù)量時(shí),會(huì)釋放不再使用的關(guān)聯(lián) cache,這也會(huì)引發(fā) stack 清理操作。
proc.c
static void procresize(int32 new)
{
// free unused P's
for(i = new; i < old; i++) {
p = runtime·allp[i];
runtime·freemcache(p->mcache);
}
}
mcache.c
static void freemcache(MCache *c)
{
runtime·stackcache_clear(c);
}
官方一直在宣傳連續(xù)棧的好處,但實(shí)際性能表現(xiàn)和具體場(chǎng)景有關(guān),并非處處適宜。另外,緩存對(duì)象的確可以提升性能,但過多的緩存對(duì)象放在復(fù)用鏈表中,卻成為浪費(fèi)和負(fù)擔(dān)。興許以后的版本會(huì)有更好的表現(xiàn)。
為支持并發(fā)調(diào)度,專門對(duì) syscall、cgo 等操作進(jìn)行包裝,以便在長(zhǎng)時(shí)間阻塞時(shí)能切換執(zhí)行其他任務(wù)。
src/syscall/asm_linux_amd64.s
TEXT ·Syscall(SB),NOSPLIT,$0-56
CALL runtime·entersyscall(SB)
MOVQ 16(SP), DI
MOVQ 24(SP), SI
MOVQ 32(SP), DX
MOVQ $0, R10
MOVQ $0, R8
MOVQ $0, R9
MOVQ 8(SP), AX // syscall entry
SYSCALL
CMPQ AX, $0xfffffffffffff001
JLS ok
MOVQ $-1, 40(SP) // r1
MOVQ $0, 48(SP) // r2
NEGQ AX
MOVQ AX, 56(SP) // errno
CALL runtime·exitsyscall(SB)
RET
ok:
MOVQ AX, 40(SP) // r1
MOVQ DX, 48(SP) // r2
MOVQ $0, 56(SP) // errno
CALL runtime·exitsyscall(SB)
RET
cgocall.go
func cgocall(fn, arg unsafe.Pointer) {
cgocall_errno(fn, arg)
}
func cgocall_errno(fn, arg unsafe.Pointer) int32 {
entersyscall()
errno := asmcgocall_errno(fn, arg)
exitsyscall()
return errno
}
進(jìn)入系統(tǒng)調(diào)用前保存執(zhí)行現(xiàn)場(chǎng),這是任務(wù)切換的關(guān)鍵。
proc.c
void ·entersyscall(int32 dummy)
{
runtime·reentersyscall((uintptr)runtime·getcallerpc(&dummy),
runtime·getcallersp(&dummy));
}
void runtime·reentersyscall(uintptr pc, uintptr sp)
{
// 保存現(xiàn)場(chǎng)。
save(pc, sp);
g->syscallsp = sp;
g->syscallpc = pc;
runtime·casgstatus(g, Grunning, Gsyscall);
// 喚醒 sysmon 線程。
if(runtime·atomicload(&runtime·sched.sysmonwait)) {
fn = entersyscall_sysmon;
runtime·onM(&fn);
save(pc, sp);
}
// 解除任務(wù)關(guān)聯(lián)引用。
g->m->mcache = nil;
g->m->p->m = nil;
runtime·atomicstore(&g->m->p->status, Psyscall);
}
static void save(uintptr pc, uintptr sp)
{
g->sched.pc = pc;
g->sched.sp = sp;
g->sched.lr = 0;
g->sched.ret = 0;
g->sched.ctxt = 0;
g->sched.g = g;
}
必須確保 sysmon 線程運(yùn)行,如此才能在長(zhǎng)時(shí)間阻塞時(shí),回收其關(guān)聯(lián) P 執(zhí)行其他任務(wù)。
proc.c
static void entersyscall_sysmon(void)
{
// 喚醒 sysmon M。
if(runtime·atomicload(&runtime·sched.sysmonwait)) {
runtime·atomicstore(&runtime·sched.sysmonwait, 0);
runtime·notewakeup(&runtime·sched.sysmonnote);
}
}
另有 entersyscallblock 會(huì)主動(dòng)釋放 P,用于執(zhí)行可確定的長(zhǎng)時(shí)間阻塞調(diào)用。
proc.c
void ·entersyscallblock(int32 dummy)
{
save((uintptr)runtime·getcallerpc(&dummy), runtime·getcallersp(&dummy));
g->syscallsp = g->sched.sp;
g->syscallpc = g->sched.pc;
runtime·casgstatus(g, Grunning, Gsyscall);
// 釋放關(guān)聯(lián) P。
fn = entersyscallblock_handoff;
runtime·onM(&fn);
save((uintptr)runtime·getcallerpc(&dummy), runtime·getcallersp(&dummy));
}
static void entersyscallblock_handoff(void)
{
// 釋放 P,讓其執(zhí)行其他任務(wù)。
handoffp(releasep());
}
從系統(tǒng)調(diào)用退出時(shí),優(yōu)先檢查關(guān)聯(lián) P 是否還在。
proc.c
void ·exitsyscall(int32 dummy)
{
// 如果能關(guān)聯(lián) P。
if(exitsyscallfast()) {
runtime·casgstatus(g, Gsyscall, Grunning);
return;
}
fn = exitsyscall0;
runtime·mcall(&fn);
}
static bool exitsyscallfast(void)
{
// 如果關(guān)聯(lián) P 扔在,嘗試重新關(guān)聯(lián)。
if(g->m->p && g->m->p->status == Psyscall &&
runtime·cas(&g->m->p->status, Psyscall, Prunning)) {
g->m->mcache = g->m->p->mcache;
g->m->p->m = g->m;
return true;
}
// 嘗試關(guān)聯(lián)空閑 P。
g->m->p = nil;
if(runtime·sched.pidle) {
fn = exitsyscallfast_pidle;
runtime·onM(&fn);
if(g->m->scalararg[0]) {
g->m->scalararg[0] = 0;
return true;
}
}
return false;
}
static void exitsyscallfast_pidle(void)
{
p = pidleget();
if(p) {
acquirep(p);
}
}
如快速退出失敗,且無法獲取可用 P,那只能將當(dāng)前 G 任務(wù)放回待運(yùn)行隊(duì)列。
proc.c
static void exitsyscall0(G *gp)
{
runtime·casgstatus(gp, Gsyscall, Grunnable);
// 獲取空閑 P。
p = pidleget();
// 如獲取 P 失敗,將當(dāng)前 G 放回全局隊(duì)列。
if(p == nil)
globrunqput(gp);
// 關(guān)聯(lián) P,繼續(xù)執(zhí)行。
if(p) {
acquirep(p);
execute(gp); // Never returns.
}
// 關(guān)聯(lián)失敗,休眠當(dāng)前 M。
stopm();
schedule(); // Never returns.
}
注:以 Raw 開頭的函數(shù)不使用包裝模式。
調(diào)度器使用專門線程跑系統(tǒng)監(jiān)控,主動(dòng)完成那些長(zhǎng)時(shí)間沒有觸發(fā)的事件。
// The main goroutine.
func main() {
onM(newsysmon)
}
proc.c
void runtime·newsysmon(void)
{
// 啟動(dòng)獨(dú)立線程運(yùn)行 sysmon。
newm(sysmon, nil);
}
監(jiān)控函數(shù) sysmon 循環(huán)運(yùn)行所有檢查任務(wù)。
proc.c
static void sysmon(void)
{
// If we go two minutes without a garbage collection, force one to run.
forcegcperiod = 2*60*1e9;
// If a heap span goes unused for 5 minutes after a garbage collection,
// we hand it back to the operating system.
scavengelimit = 5*60*1e9;
// Make wake-up period small enough for the sampling to be correct.
maxsleep = forcegcperiod/2;
if(scavengelimit < forcegcperiod)
maxsleep = scavengelimit/2;
for(;;) {
if(idle == 0) // start with 20us sleep...
delay = 20;
else if(idle > 50) // start doubling the sleep after 1ms...
delay *= 2;
if(delay > 10*1000) // up to 10ms
delay = 10*1000;
// 根據(jù) idle 調(diào)整循環(huán)暫停時(shí)間。
runtime·usleep(delay);
// 如垃圾回收啟動(dòng),休眠 sysmon 線程。
if(runtime·debug.schedtrace <= 0 && (runtime·sched.gcwaiting ...)) {
if(runtime·atomicload(&runtime·sched.gcwaiting) || ...) {
// 設(shè)置標(biāo)志,休眠一段時(shí)間。
runtime·atomicstore(&runtime·sched.sysmonwait, 1);
runtime·notetsleep(&runtime·sched.sysmonnote, maxsleep);
// 喚醒后清除等待標(biāo)志。
runtime·atomicstore(&runtime·sched.sysmonwait, 0);
runtime·noteclear(&runtime·sched.sysmonnote);
idle = 0;
delay = 20;
}
}
// 如超過 10ms 沒處理 netpoll,立即獲取,并添加到任務(wù)隊(duì)列。
lastpoll = runtime·atomicload64(&runtime·sched.lastpoll);
if(lastpoll != 0 && lastpoll + 10*1000*1000 < now) {
runtime·cas64(&runtime·sched.lastpoll, lastpoll, now);
gp = runtime·netpoll(false); // non-blocking
if(gp) {
injectglist(gp);
}
}
// 收回因系統(tǒng)調(diào)用長(zhǎng)時(shí)間阻塞的 P。
// 向長(zhǎng)時(shí)間運(yùn)行的 G 任務(wù)發(fā)出搶占調(diào)度通知。
if(retake(now))
idle = 0;
else
idle++;
// 如超過 2 分鐘未做垃圾回收,強(qiáng)制啟動(dòng)。
lastgc = runtime·atomicload64(&mstats.last_gc);
if(lastgc != 0 && unixnow - lastgc > forcegcperiod && ...) {
// 將 forcegc.G 放回任務(wù)隊(duì)列,使其運(yùn)行。
injectglist(runtime·forcegc.g);
}
// 釋放長(zhǎng)時(shí)間閑置 span 物理內(nèi)存。
if(lastscavenge + scavengelimit/2 < now) {
runtime·MHeap_Scavenge(nscavenge, now, scavengelimit);
lastscavenge = now;
}
}
}
forcegc 和 scavenge 前面都已說過。retake 使用計(jì)數(shù)器判斷 syscall 或 G 任務(wù)的運(yùn)行時(shí)間。
proc.c
struct Pdesc
{
uint32 schedtick; // scheduler execute 執(zhí)行次數(shù)。
int64 schedwhen;
uint32 syscalltick; // syscall 執(zhí)行次數(shù),在 exitsyscall 結(jié)束前遞增。
int64 syscallwhen;
};
static Pdesc pdesc[MaxGomaxprocs];
static uint32 retake(int64 now)
{
n = 0;
// 循環(huán)檢查所有 P。
for(i = 0; i < runtime·gomaxprocs; i++) {
p = runtime·allp[i];
if(p==nil) continue;
pd = &pdesc[i];
s = p->status;
if(s == Psyscall) {
// 如果和 pdesc 中的計(jì)數(shù)不等,表示啟動(dòng)了新 syscall,刷新計(jì)數(shù)器。
// 再次 retake 時(shí),如計(jì)數(shù)依然相等,表示依然阻塞在上次 syscall 中,
// 時(shí)間起碼超過一次 sysmon sleep (最少 20us)。
t = p->syscalltick;
if(pd->syscalltick != t) {
pd->syscalltick = t;
pd->syscallwhen = now;
continue;
}
// 如 P 沒有其他任務(wù),且沒超過 10ms,跳過。
if(p->runqhead == p->runqtail &&
runtime·atomicload(&runtime·sched.nmspinning) +
runtime·atomicload(&runtime·sched.npidle) > 0 &&
pd->syscallwhen + 10*1000*1000 > now)
continue;
// 收回被 syscall 阻塞的 P,用于執(zhí)行其他任務(wù)。
if(runtime·cas(&p->status, s, Pidle)) {
n++;
handoffp(p);
}
} else if(s == Prunning) {
// 計(jì)數(shù)不等,表示啟動(dòng)新 G 任務(wù)執(zhí)行,刷新計(jì)數(shù)器。
// 再次 retake 時(shí),如計(jì)數(shù)依然相等,表示該任務(wù)執(zhí)行時(shí)間超過一次 sysmon sleep 間隔。
t = p->schedtick;
if(pd->schedtick != t) {
pd->schedtick = t;
pd->schedwhen = now;
continue;
}
// 檢查超時(shí) (10ms)。
if(pd->schedwhen + 10*1000*1000 > now)
continue;
// 設(shè)置搶占調(diào)度標(biāo)記。
preemptone(p);
}
}
return n;
}
前面說過 entersyscall 會(huì)保存現(xiàn)場(chǎng),解除引用,因此 sysmon 可以安全拿回 P。調(diào)度器會(huì)積極嘗試讓這個(gè) P 跑起來,這是它的責(zé)任。
proc.c
static void handoffp(P *p)
{
// if it has local work, start it straight away
if(p->runqhead != p->runqtail || runtime·sched.runqsize) {
startm(p, false);
return;
}
// no local work, check that there are no spinning/idle M's,
// otherwise our help is not required
if(runtime·atomicload(&runtime·sched.nmspinning) +
runtime·atomicload(&runtime·sched.npidle) == 0 &&
runtime·cas(&runtime·sched.nmspinning, 0, 1)){
startm(p, true);
return;
}
// gc
if(runtime·sched.gcwaiting) {
p->status = Pgcstop;
if(--runtime·sched.stopwait == 0)
runtime·notewakeup(&runtime·sched.stopnote);
return;
}
if(runtime·sched.runqsize) {
startm(p, false);
return;
}
// If this is the last running P and nobody is polling network,
// need to wakeup another M to poll network.
if(runtime·sched.npidle == runtime·gomaxprocs-1 &&
runtime·atomicload64(&runtime·sched.lastpoll) != 0) {
startm(p, false);
return;
}
pidleput(p);
}
至于搶占調(diào)度通知不過是在 G 棧上設(shè)置一個(gè)標(biāo)志。類似操作,在很多地方都能看到。
proc.c
// Tell the goroutine running on processor P to stop.
static bool preemptone(P *p)
{
mp = p->m;
if(mp == nil || mp == g->m)
return false;
gp = mp->curg;
if(gp == nil || gp == mp->g0)
return false;
gp->preempt = true;
// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp->stackguard0 = StackPreempt;
return true;
}
實(shí)際的調(diào)度行為由編譯器插入到函數(shù)頭部的 morestack 引發(fā)。
asm_amd64.s
TEXT runtime·morestack(SB),NOSPLIT,$0-0
MOVQ (g_sched+gobuf_sp)(BP), SP
CALL runtime·newstack(SB)
stack.c
void runtime·newstack(void)
{
if(gp->stackguard0 == (uintptr)StackPreempt) {
// Act like goroutine called runtime.Gosched.
runtime·casgstatus(gp, Gwaiting, Grunning);
runtime·gosched_m(gp); // never return
}
}
可見搶占調(diào)度的前提是執(zhí)行其他非內(nèi)聯(lián)函數(shù)。如果任務(wù)跑沒有函數(shù)調(diào)用的無限循環(huán),那么 M/P 就會(huì)被一直霸占,最慘的是 GOMAXPROCS = 1,其他任務(wù)都會(huì)餓死。
使用用環(huán)境變量 GODEBUG="schedtrace=xxx" 輸出調(diào)度器跟蹤信息。
更詳細(xì)的信息,需指定 "scheddetail=1"。
$ GOMAXPROCS=2 GODEBUG="schedtrace=1000,scheddetail=1" ./test
SCHED 1002ms: gomaxprocs=2 idleprocs=0 threads=3 idlethreads=0 runqueue=0 ...
P0: status=1 schedtick=4 syscalltick=3 m=0 runqsize=51 gfreecnt=0
P1: status=1 schedtick=5 syscalltick=0 m=2 runqsize=50 gfreecnt=0
M2: p=1 curg=10 mallocing=0 throwing=0 gcing=0 locks=0 dying=0 helpgc=0 ...
M1: p=-1 curg=-1 mallocing=0 throwing=0 gcing=0 locks=1 dying=0 helpgc=0 ...
M0: p=0 curg=9 mallocing=0 throwing=0 gcing=0 locks=0 dying=0 helpgc=0 ...
G1: status=4(sleep) m=-1 lockedm=-1
G2: status=1() m=-1 lockedm=-1
G3: status=1() m=-1 lockedm=-1
相關(guān)代碼請(qǐng)參考 proc.c/runtime·schedtrace 函數(shù)。
Channel 是 Go 實(shí)現(xiàn) CSP 模型的關(guān)鍵,鼓勵(lì)用通訊來實(shí)現(xiàn)共享。
在具體實(shí)現(xiàn)上,類似 FIFO 隊(duì)列,多個(gè) G 排隊(duì)等待收發(fā)操作。同步模式,從排隊(duì)鏈表中獲取一個(gè)能與之交換數(shù)據(jù)的對(duì)象;異步模式,圍繞數(shù)據(jù)緩沖區(qū)空位排隊(duì)。
先了解幾個(gè)基本的數(shù)據(jù)類型。
SudoG 對(duì) G 進(jìn)行包裝,而 WaitQ 則是 SudoG 排隊(duì)鏈表。
runtime.h
struct SudoG
{
G* g;
uint32* selectdone;
SudoG* next;
SudoG* prev;
void* elem; // 發(fā)送或接收的數(shù)據(jù)。
int64 releasetime;
int32 nrelease; // -1 for acquire
SudoG* waitlink; // G.waiting list
};
chan.h
struct WaitQ
{
SudoG* first;
SudoG* last;
};
每個(gè) channel 除發(fā)送和接收排隊(duì)鏈表外,還由一個(gè)環(huán)狀數(shù)據(jù)緩沖槽隊(duì)列。
chan.h
struct Hchan
{
uintgo qcount; // 緩沖數(shù)據(jù)項(xiàng)數(shù)量。
uintgo dataqsiz; // 緩沖槽數(shù)量。
byte* buf; // 緩沖區(qū)指針。
uint16 elemsize; // 數(shù)據(jù)項(xiàng)長(zhǎng)度。
uint32 closed; // 關(guān)閉標(biāo)記。
Type* elemtype; // 數(shù)據(jù)項(xiàng)類型。
uintgo sendx; // 發(fā)送索引。
uintgo recvx; // 接收索引。
WaitQ recvq; // 等待接收 G 排隊(duì)鏈表。
WaitQ sendq; // 等待發(fā)送 G 排隊(duì)鏈表。
Mutex lock;
};
創(chuàng)建 channel 對(duì)象時(shí),需要指定緩沖槽數(shù)量,同步模式為 0。
chan.go
const (
hchanSize = unsafe.Sizeof(hchan{}) +
uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
)
func makechan(t *chantype, size int64) *hchan {
elem := t.elem
// 數(shù)據(jù)項(xiàng)長(zhǎng)度不能超過 64KB。
if elem.size >= 1<<16 {
gothrow("makechan: invalid channel element type")
}
var c *hchan
if elem.kind&kindNoPointers != 0 || size == 0 {
// 一次性分配 channel 和緩沖區(qū)內(nèi)存。
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*uintptr(elem.size), ...))
// 調(diào)整緩沖區(qū)指針。
if size > 0 && elem.size != 0 {
c.buf = (*uint8)(add(unsafe.Pointer(c), hchanSize))
} else {
c.buf = (*uint8)(unsafe.Pointer(c))
}
} else {
// 如果數(shù)據(jù)項(xiàng)是指針,單獨(dú)分配一個(gè)指針數(shù)組作為緩沖區(qū)。
c = new(hchan)
c.buf = (*uint8)(newarray(elem, uintptr(size)))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
同步和異步實(shí)現(xiàn)算法有很大差異。但不知什么原因,運(yùn)行時(shí)開發(fā)人員硬將這些塞到同一個(gè)函數(shù)里。為閱讀方便,我們將其拆開說明。
同步收發(fā)操作的關(guān)鍵,是從排隊(duì)鏈表里找到一個(gè)合作者。找到,直接私下交換數(shù)據(jù);找不到,把自己打包成 SudoG,放到排隊(duì)鏈表里,然后休眠,直到被另一方喚醒。
參數(shù) ep 表示待發(fā)送或接收數(shù)據(jù)內(nèi)存指針。
chan.go
func chansend(t *chantype, c *hchan, ep unsafe.Pointer, ...) bool {
lock(&c.lock)
// --- 同步模式 ------------------------------------------------------
if c.dataqsiz == 0 {
// 從接收排隊(duì)鏈表查找接收者。
sg := c.recvq.dequeue()
if sg != nil {
// 找到合作者以后,就不再需要 channel 參與。
unlock(&c.lock)
recvg := sg.g
// 將數(shù)據(jù)拷貝給接收者。
if sg.elem != nil {
memmove(unsafe.Pointer(sg.elem), ep, uintptr(c.elemsize))
sg.elem = nil
}
// 將準(zhǔn)備喚醒的接收者 SudoG 保存到目標(biāo) G.param。
// 同步方式的參與雙方通過檢查該參數(shù)來確定是被另一方喚醒。
// 另外,channel select 用該參數(shù)獲知可用的 SudoG。
// 異步方式僅關(guān)心緩沖槽,并不需要有另一方配合,因此無需填寫該參數(shù)。
recvg.param = unsafe.Pointer(sg)
// 喚醒接收者。
// 注意,此時(shí)接收者已經(jīng)離開排隊(duì)鏈表,而且數(shù)據(jù)已經(jīng)完成拷貝。
goready(recvg)
return true
}
// 如果找不到接收者,那么打包成 SudoG。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
gp.param = nil
// 放到發(fā)送者排隊(duì)鏈表、阻塞。
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send")
// 被喚醒。檢查 param 參數(shù),確定是被接收者而不是 close 喚醒。
// 被喚醒前,數(shù)據(jù)已經(jīng)被接收者拷貝完成。
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
gothrow("chansend: spurious wakeup")
}
panic("send on closed channel")
}
gp.param = nil
releaseSudog(mysg)
return true
}
return true
}
同步接收和同步發(fā)送流程基本一致。
chan.go
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, ...) (selected, received bool) {
lock(&c.lock)
// --- 同步模式 ------------------------------------------------------
if c.dataqsiz == 0 {
// 從發(fā)送排隊(duì)鏈表找出一個(gè)發(fā)送者。
sg := c.sendq.dequeue()
if sg != nil {
// 撇開 channel,私下交易。
unlock(&c.lock)
// 拷貝數(shù)據(jù)。
if ep != nil {
memmove(ep, sg.elem, uintptr(c.elemsize))
}
sg.elem = nil
gp := sg.g
// 設(shè)置喚醒檢查參數(shù),喚醒發(fā)送者。
// 喚醒前,數(shù)據(jù)已經(jīng)完成拷貝。
gp.param = unsafe.Pointer(sg)
goready(gp)
selected = true
received = true
return
}
// 如果沒有發(fā)送者,打包成 SudoG。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
gp.param = nil
// 放到接收排隊(duì)鏈表、阻塞。
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive")
// 檢查是否被發(fā)送者喚醒,以確定數(shù)據(jù)可用。close 喚醒肯定不會(huì)有數(shù)據(jù)。
// 喚醒前,發(fā)送者已經(jīng)將數(shù)據(jù)拷貝到 ep。
haveData := gp.param != nil
gp.param = nil
releaseSudog(mysg)
if haveData {
selected = true
received = true
return
}
lock(&c.lock)
return recvclosed(c, ep)
}
}
簡(jiǎn)單點(diǎn)說,要么主動(dòng)找到合作方,要么去排隊(duì)等著被動(dòng)喚醒,這是一個(gè)雙向過程。
SudoG 會(huì)被緩存到 cache,這個(gè)沒什么需要特別說明的。
malloc.h
struct MCache
{
SudoG* sudogcache;
};
proc.go
func acquireSudog() *sudog {
c := gomcache()
s := c.sudogcache
if s != nil {
c.sudogcache = s.next
s.next = nil
return s
}
mp := acquirem()
p := new(sudog)
releasem(mp)
return p
}
func releaseSudog(s *sudog) {
gp := getg()
c := gomcache()
s.next = c.sudogcache
c.sudogcache = s
}
同步關(guān)鍵是查找合作方,而異步關(guān)鍵則是緩沖區(qū)空槽。channel 使用 qcount、sendx、recvx 來維護(hù)一個(gè)環(huán)狀緩沖隊(duì)列。
chan.go
func chansend(t *chantype, c *hchan, ep unsafe.Pointer, ...) bool {
lock(&c.lock)
// --- 異步模式 ------------------------------------------------------
// 如果緩沖區(qū)沒有空槽。
for c.qcount >= c.dataqsiz {
// 打包成 SudoG。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.g = gp
mysg.elem = nil
mysg.selectdone = nil
// 放入發(fā)送隊(duì)列,休眠。
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send")
// 被喚醒,釋放 SudoG。
// 被喚醒前,SudoG 已經(jīng)被某個(gè)接收者彈出排隊(duì)鏈表。
releaseSudog(mysg)
// ... 循環(huán)重試 ...
}
// 有空槽,直接將數(shù)據(jù)拷貝到緩沖區(qū)。
memmove(chanbuf(c, c.sendx), ep, uintptr(c.elemsize))
// 調(diào)整緩沖區(qū)參數(shù)。
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
// 發(fā)送操作完成。
// 緩沖區(qū)有了可用數(shù)據(jù),嘗試將某個(gè)接收者從排隊(duì)鏈表彈出,喚醒它處理數(shù)據(jù)。
sg := c.recvq.dequeue()
if sg != nil {
recvg := sg.g
unlock(&c.lock)
goready(recvg)
}
return true
}
將數(shù)據(jù)拷貝到緩沖區(qū),然后嘗試去喚醒某個(gè)接收者處理。同樣,接收者如果找不到可用的緩存數(shù)據(jù),會(huì)將自己放到排隊(duì)鏈表,等待某個(gè)發(fā)送者寫入數(shù)據(jù)后喚醒。
chan.go
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, ...) (selected, received bool) {
lock(&c.lock)
// --- 異步模式 ------------------------------------------------------
// 如果沒有可用緩存數(shù)據(jù)。
for c.qcount <= 0 {
// 打包成 SudoG.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = nil
mysg.g = gp
mysg.selectdone = nil
// 放到接收排隊(duì)鏈表、阻塞。
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive")
// 被喚醒。
// 被喚醒前,SudoG 已經(jīng)被彈出排隊(duì)鏈表。
releaseSudog(mysg)
lock(&c.lock)
// ... 循環(huán)重試 ...
}
// 如果有可用數(shù)據(jù)項(xiàng),直接拷貝到 ep。
if ep != nil {
memmove(ep, chanbuf(c, c.recvx), uintptr(c.elemsize))
}
// 清除緩沖槽,調(diào)整緩沖區(qū)參數(shù)。
memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
// 接收完成,表示有空槽,嘗試喚醒某個(gè)發(fā)送者。
sg := c.sendq.dequeue()
if sg != nil {
gp := sg.g
unlock(&c.lock)
goready(gp)
}
selected = true
received = true
return
}
對(duì)于 nil channel,總是阻塞。而當(dāng)用 close 關(guān)閉 channel 時(shí),會(huì)喚醒所有排隊(duì)者,讓它們處理完已有的操作,比如已排隊(duì)等待發(fā)送的數(shù)據(jù),或已經(jīng)寫入緩沖區(qū)的數(shù)據(jù)。
chan.go
func closechan(c *hchan) {
// 不要 close nil channel。
if c == nil {
panic("close of nil channel")
}
// 不要多次 close channal。
if c.closed != 0 {
panic("close of closed channel")
}
// 關(guān)閉標(biāo)志。
c.closed = 1
// 喚醒所有排隊(duì)的接收者。
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
gp := sg.g
sg.elem = nil
gp.param = nil // 如果是同步方式,表明是 closechan 喚醒。
goready(gp)
}
// 喚醒所有排隊(duì)的發(fā)送者。
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
gp := sg.g
sg.elem = nil
gp.param = nil
goready(gp)
}
}
總結(jié)規(guī)則如下:
發(fā)送:
編譯器會(huì)將所有 case 轉(zhuǎn)換為 Scase 對(duì)象,注冊(cè)到 select.scase,自然也包括 default。
chan.h
struct Scase
{
void* elem; // data element
Hchan* chan; // chan
uintptr pc; // return pc
uint16 kind;
uint16 so; // vararg of selected bool
bool* receivedp; // pointer to received bool (recv2)
int64 releasetime;
};
struct Select
{
uint16 tcase; // total count of scase[]
uint16 ncase; // currently filled scase[]
uint16* pollorder; // case poll order
Hchan** lockorder; // channel lock order
Scase scase[1]; // one per case (in order of appearance)
};
因?yàn)?case 語句數(shù)量是確定的,因此在初始化時(shí),會(huì)一次性分配所需的全部?jī)?nèi)存。
select.go
func newselect(sel *_select, selsize int64, size int32) {
// 確認(rèn)內(nèi)存長(zhǎng)度。
if selsize != int64(selectsize(uintptr(size))) {
gothrow("bad select size")
}
sel.tcase = uint16(size)
sel.ncase = 0
// 確認(rèn)起始地址。
sel.lockorder = (**hchan)(add(unsafe.Pointer(&sel.scase),
uintptr(size)*unsafe.Sizeof(_select{}.scase[0])))
sel.pollorder = (*uint16)(add(unsafe.Pointer(sel.lockorder),
uintptr(size)*unsafe.Sizeof(*_select{}.lockorder)))
}
// 內(nèi)存組成 select + scase + lockerorder + pollorder。
func selectsize(size uintptr) uintptr {
selsize := unsafe.Sizeof(_select{}) +
(size-1)*unsafe.Sizeof(_select{}.scase[0]) + // Select 已經(jīng)有一個(gè) scase[1]
size*unsafe.Sizeof(*_select{}.lockorder) +
size*unsafe.Sizeof(*_select{}.pollorder)
return round(selsize, _Int64Align)
}
內(nèi)存布局:
+----------+---------------+-------------------+-------------------+
| select | scase array | lockorder array | pollorder array |
+----------+---------------+-------------------+-------------------+
后兩成員是算法需要使用的排序表。pollorder 保存亂序后的 scase 序號(hào),如此遍歷時(shí)就形成了隨機(jī)選擇。而 lockorder 對(duì) case channel 地址排序,當(dāng)多處使用同一 channel時(shí),可避免重復(fù)加鎖。
注冊(cè)函數(shù)并沒多少玄機(jī),無非是通過 ncase 確定注冊(cè)位置。依據(jù) case channel 操作方式,分為 send、recv、default 三種類型。
select.go
func selectsend(sel *_select, c *hchan, elem unsafe.Pointer) (selected bool) {
if c != nil {
selectsendImpl(sel, c, getcallerpc(unsafe.Pointer(&sel)), elem,
uintptr(unsafe.Pointer(&selected))-uintptr(unsafe.Pointer(&sel)))
}
return
}
func selectsendImpl(sel *_select, c *hchan, pc uintptr, elem unsafe.Pointer, so uintptr)
{
// 當(dāng)前注冊(cè)位置。
i := sel.ncase
// 判斷是否超出限制。
if i >= sel.tcase {
gothrow("selectsend: too many cases")
}
// 下一注冊(cè)位置。
sel.ncase = i + 1
// 通過當(dāng)前注冊(cè)位置,獲取 scase 指針。
cas := (*scase)(add(unsafe.Pointer(&sel.scase),
uintptr(i)*unsafe.Sizeof(sel.scase[0])))
cas.pc = pc
cas._chan = c
cas.so = uint16(so)
cas.kind = _CaseSend
cas.elem = elem
}
選擇算法的實(shí)現(xiàn)有些復(fù)雜,又是一個(gè)超長(zhǎng)函數(shù),充斥大量的 goto。
select.go
func selectgo(sel *_select) {
pc, offset := selectgoImpl(sel)
}
func selectgoImpl(sel *_select) (uintptr, uint16) {
scaseslice := sliceStruct{unsafe.Pointer(&sel.scase), int(sel.ncase), ...}
scases := *(*[]scase)(unsafe.Pointer(&scaseslice))
// 填充 pollorder,然后洗牌形成亂序。
pollorder := *(*[]uint16)(unsafe.Pointer(&pollslice))
...
// 將 case channel 按地址排序。
lockorder := *(*[]*hchan)(unsafe.Pointer(&lockslice))
...
// 鎖定全部 channel。
sellock(sel)
loop:
// 1: 查找已準(zhǔn)備好的 case。
for i := 0; i < int(sel.ncase); i++ {
// 使用 pollorder 返回 case,這就是 select 隨機(jī)選擇的關(guān)鍵。
cas = &scases[pollorder[i]]
c = cas._chan
switch cas.kind {
case _CaseRecv:
if c.dataqsiz > 0 { // 異步
if c.qcount > 0 { // 有緩沖數(shù)據(jù)
goto asyncrecv
}
} else { // 同步
sg = c.sendq.dequeue()
if sg != nil { // 有接收者
goto syncrecv
}
}
if c.closed != 0 { // 關(guān)閉
goto rclose
}
case _CaseSend:
if c.closed != 0 {
goto sclose
}
if c.dataqsiz > 0 {
if c.qcount < c.dataqsiz {
goto asyncsend
}
} else {
sg = c.recvq.dequeue()
if sg != nil {
goto syncsend
}
}
case _CaseDefault:
dfl = cas
}
}
// 如沒有準(zhǔn)備好的 case,嘗試執(zhí)行 default。
if dfl != nil {
selunlock(sel)
cas = dfl
goto retc
}
// 2: 如果沒有任何準(zhǔn)備好的 case ...
// 打包 SudoG,放到所有 channel 排隊(duì)鏈表,等待喚醒。
gp = getg()
done = 0
for i := 0; i < int(sel.ncase); i++ {
cas = &scases[pollorder[i]]
c = cas._chan
// 創(chuàng)建 SudoG。
sg := acquireSudog()
sg.g = gp
sg.elem = cas.elem
sg.waitlink = gp.waiting
gp.waiting = sg // 全部 SudoG 鏈表。
// 將 SudoG 放到 channel 排隊(duì)鏈表。
switch cas.kind {
case _CaseRecv:
c.recvq.enqueue(sg)
case _CaseSend:
c.sendq.enqueue(sg)
}
}
// 休眠,等待喚醒。
gp.param = nil
gopark(unsafe.Pointer(funcPC(selparkcommit)), unsafe.Pointer(sel), "select")
// 因所有 channel SudoG 都使用當(dāng)前 G,所以可被任何 channel 操作喚醒。
sellock(sel)
sg = (*sudog)(gp.param) // 同步時(shí)指向被喚醒的 SudoG,異步為 nil。
// 3: 找出被喚醒的 case channel。
cas = nil
sglist = gp.waiting
// 遍歷檢查被喚醒的 SudoG 是否是第 2 步創(chuàng)建的。
// 注意,sglist 和 pollorder 順序一致。
for i := int(sel.ncase) - 1; i >= 0; i-- {
k = &scases[pollorder[i]]
// 如果屬于 ...
if sg == sglist {
// 同步喚醒。
cas = k
} else {
// 不屬于,取消排隊(duì)。
c = k._chan
if k.kind == _CaseSend {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
// 異步喚醒,回到第 1 步處理。
if cas == nil {
goto loop
}
// 同步方式下,在被喚醒前,數(shù)據(jù)已經(jīng)完成交換,直接結(jié)束即可。
selunlock(sel)
goto retc
// 下面這些代碼在前一節(jié)已經(jīng)說過,此處忽略。
asyncrecv:
asyncsend:
syncrecv:
rclose:
syncsend:
retc:
return cas.pc, cas.so
sclose:
// send on closed channel
selunlock(sel)
panic("send on closed channel")
}
簡(jiǎn)化后的流程看上去清爽多了。
每次操作都對(duì)所有 channel 加鎖,是個(gè)不小的代價(jià)。
select.go
func sellock(sel *_select) {
var c *hchan
for _, c0 := range lockorder {
// 如果和前一 channel 地址相同,則無需加鎖。
if c0 != nil && c0 != c {
c = c0
lock(&c.lock)
}
}
}
func selunlock(sel *_select) {
n := int(sel.ncase)
r := 0
// 因?yàn)?default case 的 channel 為 nil,排序后總是在 lockorder[0],跳過。
if n > 0 && lockorder[0] == nil {
r = 1
}
for i := n - 1; i >= r; i-- {
c := lockorder[i]
if i > 0 && c == lockorder[i-1] {
continue // will unlock it on the next iteration
}
unlock(&c.lock)
}
}
如果在 select 語句外套上循環(huán),那就意味著每次循環(huán)都要?jiǎng)?chuàng)建對(duì)象,完成注冊(cè)、洗牌、排序、選擇等一大堆操作。
反編譯簡(jiǎn)單示例,看看 defer 的真實(shí)面目。
package main
import ()
func main() {
x := 0x100
defer println(x)
}
(gdb) disas main.main
Dump of assembler code for function main.main:
0x0000000000002016 <+22>:" sub rsp,0x8
0x000000000000201a <+26>:" mov rcx,0x100
0x0000000000002021 <+33>:" mov QWORD PTR [rsp],rcx
0x0000000000002025 <+37>:" lea rcx,[rip+0x4a5fc] # 0x4c628 <main.print.1.f>
0x000000000000202c <+44>:" push rcx
0x000000000000202d <+45>:" push 0x8
0x000000000000202f <+47>:" call 0xad80 <runtime.deferproc>
0x0000000000002034 <+52>:" pop rcx
0x0000000000002035 <+53>:" pop rcx
0x0000000000002036 <+54>:" test rax,rax
0x0000000000002039 <+57>:" jne 0x2046 <main.main+70>
0x000000000000203b <+59>:" nop
0x000000000000203c <+60>:" call 0xb490 <runtime.deferreturn>
0x0000000000002041 <+65>:" add rsp,0x8
0x0000000000002045 <+69>:" ret
不算太復(fù)雜,編譯器將其處理成 deferproc 和 deferreturn 兩個(gè)函數(shù)。
panic.go
func deferproc(siz int32, fn *funcval) {
// 編譯器依次將 args、fn、siz 入棧。
// 通過 fn 在棧上的地址,確認(rèn) args。
argp := uintptr(unsafe.Pointer(&fn))
argp += unsafe.Sizeof(fn)
mp := acquirem()
mp.scalararg[0] = uintptr(siz)
mp.ptrarg[0] = unsafe.Pointer(fn)
mp.scalararg[1] = argp
mp.scalararg[2] = getcallerpc(unsafe.Pointer(&siz))
onM(deferproc_m)
releasem(mp)
}
panic.c
void runtime·deferproc_m(void)
{
siz = g->m->scalararg[0];
fn = g->m->ptrarg[0];
argp = g->m->scalararg[1];
callerpc = g->m->scalararg[2];
d = runtime·newdefer(siz);
d->fn = fn;
d->pc = callerpc;
d->argp = argp;
// 將參數(shù)拷貝到 defer.argp。
runtime·memmove(d+1, (void*)argp, siz);
}
依照 Defer 結(jié)構(gòu)和內(nèi)存對(duì)齊,指針運(yùn)算 "d+1" 就是 argp,只是這寫法真的好嗎?
runtime.h
struct Defer
{
int32 siz;
bool started;
uintptr argp; // where args were copied from
uintptr pc;
FuncVal* fn;
Panic* panic; // panic that is running defer
Defer* link;
};
和以往一樣,Defer 會(huì)被復(fù)用。
runtime.h
struct P
{
Defer* deferpool[5];
};
panic.go
func newdefer(siz int32) *_defer {
var d *_defer
// 按 16 字節(jié)對(duì)齊后計(jì)算長(zhǎng)度索引。
sc := deferclass(uintptr(siz))
mp := acquirem()
// 復(fù)用 P.deferpool[5] 里的 Defer 對(duì)象。
if sc < uintptr(len(p{}.deferpool)) {
pp := mp.p
d = pp.deferpool[sc]
if d != nil {
pp.deferpool[sc] = d.link
}
}
// 超出長(zhǎng)度限制的,直接分配。
if d == nil {
// Allocate new defer+args.
total := goroundupsize(totaldefersize(uintptr(siz)))
d = (*_defer)(mallocgc(total, deferType, 0))
}
d.siz = siz
gp := mp.curg
// 添加到鏈表。
d.link = gp._defer
gp._defer = d
releasem(mp)
return d
}
所有鏈表被保存到 G.defer 鏈表。
runtime.h
struct G
{
Defer* defer;
};
在函數(shù)退出前,deferreturn 完成 Defer 調(diào)用。
panic.go
func deferreturn(arg0 uintptr) {
gp := getg()
// 從鏈表提取一個(gè) Defer。
d := gp._defer
if d == nil {
return
}
// 調(diào)用 deferproc 后,會(huì) pop 掉 siz、fn,那么 arg0 就是 argp。
// 如果地址不等,顯然就屬于無效調(diào)用。
argp := uintptr(unsafe.Pointer(&arg0))
if d.argp != argp {
return
}
mp := acquirem()
// 復(fù)制參數(shù)。
// 很無語,這又出了一個(gè) deferArgs,和前面的 d+1 一個(gè)意思。
// 這真的是同一個(gè)人寫的代碼?
memmove(unsafe.Pointer(argp), deferArgs(d), uintptr(d.siz))
fn := d.fn
d.fn = nil
gp._defer = d.link
freedefer(d)
releasem(mp)
// 執(zhí)行 defer 函數(shù)。
jmpdefer(fn, argp)
}
匯編函數(shù) jmpdefer 很有意思。
asm_amd64.s
// void jmpdefer(fn, sp);
// called from deferreturn.
// 1. pop the caller
// 2. sub 5 bytes from the callers return
// 3. jmp to the argument
TEXT runtime·jmpdefer(SB), NOSPLIT, $0-16
MOVQ fv+0(FP), DX // fn
MOVQ argp+8(FP), BX // caller sp
LEAQ -8(BX), SP // caller sp after CALL
SUBQ $5, (SP) // return to CALL again
MOVQ 0(DX), BX
JMP BX // but first run the deferred function
簡(jiǎn)單點(diǎn)說就是找出 "call deferreturn" 時(shí)入棧的 PC 寄存器地址。
0x000000000000203c <+60>:" call 0xb490 <runtime.deferreturn>
0x0000000000002041 <+65>:" add rsp,0x8
因 PC 寄存器指向下一條指令,那么棧上值應(yīng)該就是 0x2041,減去 call 指令長(zhǎng)度 5,結(jié)果就是 0x203c。將此地址入棧,等 jmpdefer 結(jié)束,deferreturn RET 指令所恢復(fù)的PC 寄存器值就又回到了 "call deferreturn"。配合對(duì) argp 地址檢查,就實(shí)現(xiàn)了函數(shù)內(nèi)多個(gè) Defer 的調(diào)用。
如果調(diào)用 Goexit 終止 goroutine,那么直接循環(huán)調(diào)用鏈上的所有 Defer 即可。
panic.go
func Goexit() {
// Run all deferred functions for the current goroutine.
gp := getg()
for {
d := gp._defer
if d == nil {
break
}
if d.started {
d.fn = nil
gp._defer = d.link
freedefer(d)
continue
}
d.started = true
reflectcall(unsafe.Pointer(d.fn), deferArgs(d), uint32(d.siz), uint32(d.siz))
d._panic = nil
d.fn = nil
gp._defer = d.link
freedefer(d)
}
goexit()
}
回過頭想想,一個(gè)完整 defer 過程要處理緩存對(duì)象,參數(shù)拷貝,以及多次函數(shù)調(diào)用,顯然要比直接函數(shù)調(diào)用慢得多。
var lock sync.Mutex
func test() {
lock.Lock()
lock.Unlock()
}
func testdefer() {
lock.Lock()
defer lock.Unlock()
}
func BenchmarkTest(b *testing.B) {
for i := 0; i < b.N; i++ {
test()
}
}
func BenchmarkTestDefer(b *testing.B) {
for i := 0; i < b.N; i++ {
testdefer()
}
}
BenchmarkTest" 30000000 43.5 ns/op
BenchmarkTestDefer 10000000 211 ns/op
這對(duì)于 CPU 密集型算法有很大影響,需區(qū)別對(duì)待。
Finalizer 用途類似析構(gòu)函數(shù),在關(guān)聯(lián)對(duì)象被回收時(shí)執(zhí)行。
malloc.go
func SetFinalizer(obj interface{}, finalizer interface{}) {
// object 類型信息。
e := (*eface)(unsafe.Pointer(&obj))
etyp := e._type
ot := (*ptrtype)(unsafe.Pointer(etyp))
// 忽略 nil 對(duì)象。
_, base, _ := findObject(e.data)
if base == nil {
if e.data == unsafe.Pointer(&zerobase) {
return
}
}
// finalizer 函數(shù)類型信息。
f := (*eface)(unsafe.Pointer(&finalizer))
ftyp := f._type
// 如果 finalizer 為 nil,清除。
if ftyp == nil {
// switch to M stack and remove finalizer
mp := acquirem()
mp.ptrarg[0] = e.data
onM(removeFinalizer_m)
releasem(mp)
return
}
// 確定 finalizer goroutine 啟動(dòng)。
// 所有可執(zhí)行的 finalizer 都由該 goroutine 執(zhí)行。
createfing()
// 添加 finalizer 記錄。
mp := acquirem()
mp.ptrarg[0] = f.data
mp.ptrarg[1] = e.data
mp.scalararg[0] = nret
mp.ptrarg[2] = unsafe.Pointer(fint)
mp.ptrarg[3] = unsafe.Pointer(ot)
onM(setFinalizer_m)
releasem(mp)
}
malloc.c
void runtime·setFinalizer_m(void)
{
fn = g->m->ptrarg[0];
arg = g->m->ptrarg[1];
nret = g->m->scalararg[0];
fint = g->m->ptrarg[2];
ot = g->m->ptrarg[3];
g->m->scalararg[0] = runtime·addfinalizer(arg, fn, nret, fint, ot);
}
相關(guān)信息打包成 SpecialFinalizer 對(duì)象,添加到關(guān)聯(lián)對(duì)象所在 span.specials 鏈表。
malloc.h
struct Special
{
Special* next; // linked list in span
uint16 offset; // span offset of object
byte kind; // kind of Special
};
struct SpecialFinalizer
{
Special special;
FuncVal* fn;
uintptr nret;
Type* fint;
PtrType* ot;
};
struct MSpan
{
Special *specials; // linked list of special records sorted by offset.
};
mheap.c
bool runtime·addfinalizer(void *p, FuncVal *f, uintptr nret, Type *fint, PtrType *ot)
{
SpecialFinalizer *s;
// 創(chuàng)建 finalizer special 對(duì)象。
s = runtime·FixAlloc_Alloc(&runtime·mheap.specialfinalizeralloc);
s->special.kind = KindSpecialFinalizer;
s->fn = f;
s->nret = nret;
s->fint = fint;
s->ot = ot;
// 添加到待執(zhí)行隊(duì)列。
// 雖然傳遞 s->special,但因地址相同,可轉(zhuǎn)換回 SpecialFinalizer。
if(addspecial(p, &s->special))
return true;
// 添加失敗,表示已存在,放棄。
runtime·FixAlloc_Free(&runtime·mheap.specialfinalizeralloc, s);
return false;
}
static bool addspecial(void *p, Special *s)
{
// 查找 p 所在 span。
span = runtime·MHeap_LookupMaybe(&runtime·mheap, p);
// 確保該 span 已經(jīng)完成垃圾清理。
runtime·MSpan_EnsureSwept(span);
offset = (uintptr)p - (span->start << PageShift);
kind = s->kind;
// 使用 offset、kind 檢查該 special 是否已經(jīng)存在。
t = &span->specials;
while((x = *t) != nil) {
if(offset == x->offset && kind == x->kind) {
return false; // already exists
}
if(offset < x->offset || (offset == x->offset && kind < x->kind))
break;
t = &x->next;
}
// 添加到 span.specials 鏈表。
s->offset = offset;
s->next = x;
*t = s;
return true;
}
當(dāng)執(zhí)行垃圾清理操作時(shí),會(huì)檢查 span.specials 鏈表。如果關(guān)聯(lián)對(duì)象可以被回收,那么就將 finalizer 放到執(zhí)行隊(duì)列。
mgc0.c
bool runtime·MSpan_Sweep(MSpan *s, bool preserve)
{
specialp = &s->specials;
special = *specialp;
while(special != nil) {
// 通過 bitmap 檢查 finalizer 關(guān)聯(lián)對(duì)象狀態(tài)。
p = (byte*)(s->start << PageShift) + special->offset/size*size;
off = (uintptr*)p - (uintptr*)arena_start;
bitp = arena_start - off/wordsPerBitmapByte - 1;
shift = (off % wordsPerBitmapByte) * gcBits;
bits = (*bitp>>shift) & bitMask;
// 如果是不可達(dá)對(duì)象。
if((bits&bitMarked) == 0) {
// 對(duì)象地址。
p = (byte*)(s->start << PageShift) + special->offset;
y = special;
// 從鏈表移除。
special = special->next;
*specialp = special;
// 將 finalizer 添加到執(zhí)行隊(duì)列,釋放 special。
if(!runtime·freespecial(y, p, size, false)) {
// 將關(guān)聯(lián)對(duì)象標(biāo)記為可達(dá)狀態(tài)。
*bitp |= bitMarked << shift;
}
} else {
// 存活對(duì)象,保持 special record。
specialp = &special->next;
special = *specialp;
}
}
}
注意,finalizer 會(huì)導(dǎo)致關(guān)聯(lián)對(duì)象重新變成可達(dá)狀態(tài),也就是說不會(huì)被清理操作回收。這是為了保證在 finalizer 函數(shù)內(nèi)能安全訪問關(guān)聯(lián)對(duì)象。待下次回收時(shí),finalizer 已不存在,關(guān)聯(lián)對(duì)象就可被正常收回。
mheap.c
bool runtime·freespecial(Special *s, void *p, uintptr size, bool freed)
{
SpecialFinalizer *sf;
switch(s->kind) {
case KindSpecialFinalizer:
// 轉(zhuǎn)換回 SpecialFinalizer,放到執(zhí)行隊(duì)列。
sf = (SpecialFinalizer*)s;
runtime·queuefinalizer(p, sf->fn, sf->nret, sf->fint, sf->ot);
// Special 已經(jīng)從 span.specials 移除,回收。
runtime·FixAlloc_Free(&runtime·mheap.specialfinalizeralloc, sf);
return false; // don't free p until finalizer is done
}
}
執(zhí)行隊(duì)列 FinBlock 保存多個(gè)待執(zhí)行的 FianlizerSpecial,而全局變量 finq 用鏈表管理多個(gè) FinBlock。
FinBlock 和其他類型一樣,被緩存、復(fù)用。
malloc.h
struct FinBlock
{
FinBlock *alllink;
FinBlock *next;
int32 cnt;
int32 cap;
Finalizer fin[1];
};
mgc0.c
FinBlock* runtime·finq; // list of finalizers that are to be executed
FinBlock* runtime·finc; // cache of free blocks
mgc0.c
void runtime·queuefinalizer(byte *p, FuncVal *fn, uintptr nret, Type *fint, PtrType *ot)
{
// 檢查是否需要新建 FinBlock。
if(runtime·finq == nil || runtime·finq->cnt == runtime·finq->cap) {
// 如果復(fù)用鏈表為空,新建。
if(runtime·finc == nil) {
runtime·finc = runtime·persistentalloc(FinBlockSize, 0, &mstats.gc_sys);
runtime·finc->cap = (FinBlockSize - sizeof(FinBlock)) / sizeof(Finalizer)+1;
runtime·finc->alllink = runtime·allfin;
runtime·allfin = runtime·finc;
}
block = runtime·finc;
runtime·finc = block->next;
block->next = runtime·finq;
runtime·finq = block;
}
// 添加到 FinBlock 隊(duì)列。
f = &runtime·finq->fin[runtime·finq->cnt];
runtime·finq->cnt++;
f->fn = fn;
f->nret = nret;
f->fint = fint;
f->ot = ot;
f->arg = p;
// 有了新任務(wù),設(shè)置喚醒標(biāo)記。
runtime·fingwake = true;
}
在準(zhǔn)備好執(zhí)行隊(duì)列后,由專門的 goroutine fing 完成最終執(zhí)行操作。
mgc0.c
G* runtime·fing; // goroutine that runs finalizers
malloc.go
var fingCreate uint32
func createfing() {
// 僅執(zhí)行一次。
if fingCreate == 0 && cas(&fingCreate, 0, 1) {
go runfinq()
}
}
func runfinq() {
for {
// 置換全局隊(duì)列。
fb := finq
finq = nil
// 如果隊(duì)列為空,休眠。
if fb == nil {
gp := getg()
fing = gp
fingwait = true // 休眠標(biāo)記。
gp.issystem = true
goparkunlock(&finlock, "finalizer wait")
gp.issystem = false
continue
}
// 循環(huán)處理所有 FinBlock。
for fb != nil {
// 循環(huán)處理 FinBlock 隊(duì)列里所有 FinalizerSpecial。
for i := int32(0); i < fb.cnt; i++ {
// 執(zhí)行 finalizer 函數(shù)。
f := (*finalizer)(add(unsafe.Pointer(&fb.fin),
uintptr(i)*unsafe.Sizeof(finalizer{})))
reflectcall(unsafe.Pointer(f.fn), frame, uint32(framesz),
uint32(framesz))
// 解除引用。
f.fn = nil
f.arg = nil
f.ot = nil
}
fb.cnt = 0
next := fb.next
// 將當(dāng)前 FinBlock 放回復(fù)用鏈表。
fb.next = finc
finc = fb
fb = next
}
}
}
如執(zhí)行隊(duì)列為空,fing 會(huì)被休眠。然后在 M 查找任務(wù)時(shí),嘗試喚醒。
proc.c
// Finds a runnable goroutine to execute.
static G* findrunnable(void)
{
// 如果 fing 被休眠,且喚醒標(biāo)記為真,那么執(zhí)行。
if(runtime·fingwait && runtime·fingwake && (gp = runtime·wakefing()) != nil)
runtime·ready(gp);
}
mgc0.c
G* runtime·wakefing(void)
{
G *res;
// 如正在休眠,且喚醒標(biāo)記為真,返回 fing。
if(runtime·fingwait && runtime·fingwake) {
runtime·fingwait = false; // 取消相關(guān)標(biāo)記。
runtime·fingwake = false;
res = runtime·fing; // 返回 fing。
}
return res;
}
更多建議: