如何自己建一个服务器(我是如何将后台服务器从100台优化到20台)
如何自己建一个服务器(我是如何将后台服务器从100台优化到20台)但从一开始,我们的团队就知道我们应该这样做,因为在讨论阶段,我们看到这可能是一个非常大的交通系统。我已经使用Go两年了,我们已经开发了一些系统,但是没有一个能达到这样的负载。并设置2个不同的集群,一个用于web前端,另一个用于工作人员,这样我们就可以扩大我们可以处理的后台工作的数量。我对网络上的语言和框架之争不是很感兴趣。我认为,效率、生产力和代码可维护性主要取决于您设计解决方案的简单程度。在开发匿名遥测和分析系统时,我们的目标是能够处理来自数百万个端点的大量POST请求。web处理程序将接收一个json文档,其中可能包含需要写入到Amazon S3的许多有效负载的集合,以便我们的map-reduce系统稍后对该数据进行操作。传统上,我们会考虑创建一个工作层架构,利用诸如:
这篇是我翻译的文章,感觉原文章标题《Handling 1 Million Requests per Minute with Golang》不太符合我们标题党,我稍微改了下。
我在几个不同的公司从事反垃圾邮件、反病毒和反恶意软件行业已经超过15年了,现在我知道由于我们每天要处理大量的数据,这些系统可能会变得多么复杂。
目前,我是smsjunk.com的首席执行官和KnowBe4的首席架构师,两家公司都活跃在网络安全行业。
有趣的是,在过去10年左右的时间里,作为一名软件工程师,我所参与的所有web后端开发都是用Ruby on Rails完成的。不要误会我 我爱Ruby on Rails 我相信这是一个很棒的环境 但一段时间后 你开始思考和设计系统的Ruby方法 你忘记如何有效和简单的软件架构可能是如果你可以利用多线程、并行、快速执行和小内存开销。多年来,我是一名C/ c 、Delphi和c#的开发人员,我才开始意识到,使用正确的工具可以使事情变得多么简单。
我对网络上的语言和框架之争不是很感兴趣。我认为,效率、生产力和代码可维护性主要取决于您设计解决方案的简单程度。
问题在开发匿名遥测和分析系统时,我们的目标是能够处理来自数百万个端点的大量POST请求。web处理程序将接收一个json文档,其中可能包含需要写入到Amazon S3的许多有效负载的集合,以便我们的map-reduce系统稍后对该数据进行操作。
传统上,我们会考虑创建一个工作层架构,利用诸如:
- Sidekiq
- Resque
- DelayedJob
- Elasticbeanstalk Worker Tier
- RabbitMQ
- and so on…
并设置2个不同的集群,一个用于web前端,另一个用于工作人员,这样我们就可以扩大我们可以处理的后台工作的数量。
但从一开始,我们的团队就知道我们应该这样做,因为在讨论阶段,我们看到这可能是一个非常大的交通系统。我已经使用Go两年了,我们已经开发了一些系统,但是没有一个能达到这样的负载。
我们首先创建几个结构来定义将通过POST调用接收的web请求有效负载,以及一个将其上传到S3 bucket的方法。
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadTos3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v" p.storageFolder time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path b int64(b.Len()) contentType acl s3.Options{})
}
天真的Go routines
最初,我们采取了一个非常天真想法,使用routines来处理POST请求,只是试图并行作业处理到一个简单的goroutine:
func payloadHandler(w http.ResponseWriter r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type" "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _ payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
对于中等负载,这对大多数人都适用,但很快证明,在大规模使用时效果不太好。我们预计会有很多请求,但与我们部署第一个版本到生产环境时看到的数量不同。我们完全低估了请求量。
上面的方法在几个不同的方面是不好的。没有办法控制我们生成多少个go例行程序。因为我们每分钟有100万个POST请求当然,这些代码很快就崩溃了。
再试一次我们需要找到一条不同的路。从一开始,我们就开始讨论如何保持请求处理程序的生命周期非常短,并在后台生成处理。当然,这是在Ruby on Rails世界中必须要做的事情,否则您将阻塞所有可用的worker web处理器,无论您使用的是puma、unicorn还是passenger(请不要讨论JRuby)。然后我们就需要利用常见的解决方案来实现这一点,比如Resque、Sidekiq、SQS等。还有很多方法可以实现这个目标。
第二个迭代是创建一个缓冲chan 我们可以排队一些工作和上传到S3 因为我们可以控制项目的最大数量在我们的队列 并有足够的RAM排队工作在内存中 我们认为这是好的在信道队列缓冲工作。
var Queue chan Payload
func init() {
Queue = make(chan Payload MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _ payload := range content.Payloads {
Queue <- payload
}
...
}
然后为了实际地使作业离队列并处理它们,我们使用了类似的东西:
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}
老实说,我不知道我们在想什么。这一定是一个充满红牛的深夜。这种方法没有给我们带来任何好处,我们用一个缓冲队列交换了有缺陷的并发性,这只是延迟了问题。我们同步处理器只是S3上传一个负载 因为传入请求的速度比单处理器的能力更大的S3上传 我们缓冲通道很快达到其极限和阻塞队列请求处理程序能力更多的物品。
我们只是在避免这个问题,并开始倒数直到我们的系统最终死亡。在部署了这个有bug的版本后,我们的延迟率以恒定的速率增长。
我们决定在使用Go chan时使用一个通用模式,以便创建一个两层chan系统,一个用于排队作业,另一个用于控制有多少worker并发地操作JobQueue。
其想法是将上传到S3的数据并行化,以达到某种可持续的速率,这样就不会损坏机器,也不会从S3生成连接错误。因此我们选择创建一个工作/工作者模式。对于那些熟悉Java、c#等的人来说,可以把它看作是利用通道实现工作线程池的Golang方式。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool
JobChannel: make(chan Job)
quit: make(chan bool)}
}
// Start method starts the run loop for the worker listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s" err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
我们已经修改了Web请求处理程序,以创建一个带有负载的Job struct实例,并将其发送到JobQueue的chan中,以便worker执行。
func payloadHandler(w http.ResponseWriter r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type" "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _ payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue <- work
}
w.WriteHeader(http.StatusOK)
}
在web服务器初始化过程中,我们创建了一个Dispatcher并调用Run()来创建worker池,并开始侦听将出现在JobQueue中的作业。
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
下面是我们的调度程序实现代码:
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
请注意,我们提供了要实例化和添加到工作程序池的最大工作程序数。由于我们在这个项目中使用了Amazon Elasticbeanstalk和dockerized Go环境,并且我们总是尝试遵循12因素方法来在生产环境中配置系统,因此我们从环境变量中读取这些值。这样,我们就可以控制worker的数量和作业队列的最大大小,这样我们就可以快速调整这些值,而不需要重新部署集群。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
在我们部署它之后,我们看到所有的延迟率下降到微不足道的数字,我们处理请求的能力急剧上升。
弹性负载平衡器完全启动几分钟后,我们看到我们的ElasticBeanstalk应用程序每分钟处理将近100万个请求。在早上的几个小时里,我们的请求通常会超过每分钟100万次。
一旦我们部署了新代码,服务器数量就从100台大幅减少到20台左右。
在正确配置集群和自动缩放设置之后,我们能够将其进一步降低到只有4x EC2 c4。大实例和弹性自动伸缩设置,如果CPU连续5分钟超过90%,就会生成一个新实例。
在我的书中,简单才是最好的。我们本可以设计一个包含许多队列、后台多实例和复杂部署的复杂系统,但我们决定利用Elasticbeanstalk自动伸缩的能力,以及Golang为我们提供的效率和简单的并发方法。
一个只有4台机器的集群并不是每天都能处理POST请求,每分钟向Amazon S3 bucket写入100万次,这可能比我现在的MacBook Pro要弱得多。
总有适合的工具。有时候,当您的Ruby on Rails系统需要一个非常强大的web处理程序时,请考虑Ruby生态系统之外的一些更简单、更强大的替代解决方案。
原文:https://medium.com/smsjunk/handling-1-million-requests-per-minute-with-golang-f70ac505fcaa
本人英语不好,基本上google加人工,共勉,一起进步。