新聞中心
Golang中怎么實現(xiàn)百萬級高并發(fā),很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
創(chuàng)新互聯(lián)公司-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設、高性價比咸陽網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式咸陽網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設找我們,業(yè)務覆蓋咸陽地區(qū)。費用合理售后完善,10余年實體公司更值得信賴。
特別注意:Go語言中的map不是并發(fā)安全的,要想實現(xiàn)并發(fā)安全,需要自己實現(xiàn)(如加鎖),或者使用sync.Map。
package main import ( "fmt" "runtime" "time" ) func main(){ //這里我們假設數(shù)據(jù)是int類型,緩存格式設為100 dataChan:=make(chan int,100) go func(){ for{ select{ case data:=<-dataChan: fmt.Println("data:",data) time.Sleep(1 * time.Second)//這里延遲是模擬處理數(shù)據(jù)的耗時 } } }() //填充數(shù)據(jù) for i:=0;i<100;i++{ dataChan<-i } //這里循環(huán)打印查看協(xié)程個數(shù) for { fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine()) time.Sleep(2 * time.Second) } }
這里打印出來的協(xié)程個數(shù)時2,為什么? 因為main方法獨占一個主協(xié)程,我們又起了一個協(xié)程,所以是兩個。
實現(xiàn)百萬級的并發(fā)
首先我們要抽象出幾個概念:
Job: type Job interface { Do() } // 一個數(shù)據(jù)接口,所有的數(shù)據(jù)都要實現(xiàn)該接口,才能被傳遞進來 //實現(xiàn)Job接口的一個數(shù)據(jù)實例,需要實現(xiàn)一個Do()方法,對數(shù)據(jù)的處理就在這個Do()方法中。 Job通道: 這里有兩個Job通道: 1、WorkerPool的Job channel,用于調用者把具體的數(shù)據(jù)寫入到這里,WorkerPool讀取。 2、Worker的Job channel,當WorkerPool讀取到Job,并拿到可用的Worker的時候,會將Job實例寫入該Worker的Job channel,用來直接執(zhí)行Do()方法。 Worker: type Worker struct { JobQueue chan Job //Worker的Job通道 } //每一個被初始化的worker都會在后期單獨占用一個協(xié)程 //初始化的時候會先把自己的JobQueue傳遞到Worker通道中, //然后阻塞讀取自己的JobQueue,讀到一個Job就執(zhí)行Job對象的Do()方法。 工作池(WorkerPool): type WorkerPool struct { workerlen int //WorkerPool中同時 存在Worker的個數(shù) JobQueue chan Job // WorkerPool的Job通道 WorkerQueue chan chan Job } //初始化時會按照傳入的num,啟動num個后臺協(xié)程,然后循環(huán)讀取Job通道里面的數(shù)據(jù), //讀到一個數(shù)據(jù)時,再獲取一個可用的Worker,并將Job對象傳遞到該Worker的chan通道
整個過程中 每個Worker都會被運行在一個協(xié)程中,在整個WorkerPool中就會有num可空閑的Worker,當來一條數(shù)據(jù)的時候,就會在工作池中去一個空閑的Worker去執(zhí)行該Job,當工作池中沒有可用的worker時,就會阻塞等待一個空閑的worker。
這是一個粗糙最簡單的版本,只是為了演示效果,具體使用需要根據(jù)實際情況加一些特殊的處理。
當數(shù)據(jù)無限多的時候func (wp *WorkerPool) Run() 會無限創(chuàng)建協(xié)程,這里需要做一些處理,這里是為了讓所有的請求不等待,并且體現(xiàn)一下最大峰值時的協(xié)程數(shù)。具體因項目而異。
代碼地址:https://github.com/wangzhen0625/gonote/tree/master/7goroutune
main.go
package main import ( "fmt" "runtime" "time" ) type Score struct { Num int } func (s *Score) Do() { fmt.Println("num:", s.Num) time.Sleep(1 * 1 * time.Second) } func main() { num := 100 * 100 * 20 // debug.SetMaxThreads(num + 1000) //設置最大線程數(shù) // 注冊工作池,傳入任務 // 參數(shù)1 worker并發(fā)個數(shù) p := NewWorkerPool(num) p.Run() datanum := 100 * 100 * 100 * 100 go func() { for i := 1; i <= datanum; i++ { sc := &Score{Num: i} p.JobQueue <- sc } }() for { fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine()) time.Sleep(2 * time.Second) } }
job.go
package main type Job interface { Do() }
worker.go
package main type Worker struct { JobQueue chan Job } func NewWorker() Worker { return Worker{JobQueue: make(chan Job)} } func (w Worker) Run(wq chan chan Job) { go func() { for { wq <- w.JobQueue select { case job := <-w.JobQueue: job.Do() } } }() }
workerpool.go
package main import "fmt" type WorkerPool struct { workerlen int JobQueue chan Job WorkerQueue chan chan Job } func NewWorkerPool(workerlen int) *WorkerPool { return &WorkerPool{ workerlen: workerlen, JobQueue: make(chan Job), WorkerQueue: make(chan chan Job, workerlen), } } func (wp *WorkerPool) Run() { fmt.Println("初始化worker") //初始化worker for i := 0; i < wp.workerlen; i++ { worker := NewWorker() worker.Run(wp.WorkerQueue) } // 循環(huán)獲取可用的worker,往worker中寫job go func() { for { select { case job := <-wp.JobQueue: worker := <-wp.WorkerQueue worker <- job } } }() }
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對創(chuàng)新互聯(lián)的支持。
當前文章:Golang中怎么實現(xiàn)百萬級高并發(fā)
當前路徑:http://www.ef60e0e.cn/article/ihidhd.html