快捷搜索:  汽车  科技

微服务架构最佳实践(微服务实践之分布式定时任务)

微服务架构最佳实践(微服务实践之分布式定时任务)接下来,我们就可以引入dq了,首先在etc/xxx.yaml下添加dqConfpackage main import ( "flag" "fmt" "fishtwo/app/jobs/message/internal/config" "fishtwo/app/jobs/message/internal/handler" "fishtwo/app/jobs/message/internal/svc" "github.com/tal-tech/go-zero/core/conf" "github.com/tal-tech/go-zero/rest" ) var configFile = flag.String("f"

承接上篇:上篇文章讲到改造 go-zero 生成的 app module 中的 gateway & rpc 。本篇讲讲如何接入 异步任务 以及 log的使用

Delay Job

日常任务开放中,我们会有很多异步、批量、定时、延迟任务要处理,go-zero中有 go-queue,推荐使用 go-queue 去处理,go-queue 本身也是基于 go-zero 开发的,其本身是有两种模式:

  • dq: 依赖于beanstalkd ,分布式,可存储,延迟、定时设置,关机重启可以重新执行,消息会丢失,使用非常简单,go-queue中使用了redis setnx保证了每个消息只被消费一次,使用场景主要是用来做日常任务使用
  • kq:依赖于 kafka ,这个就不多介绍啦,大名鼎鼎的 kafka ,使用场景主要是做日志用

我们主要说一下dq,kq使用也一样的,只是依赖底层不同,如果没使用过beanstalkd,没接触过beanstalkd的可以先google一下,使用起来还是挺容易的。

我在jobs下使用goctl新建了一个message-job.api服务

info( title: //消息任务 desc: // 消息任务 author: "Mikael" email: "13247629622@163.com" ) type BatchSendmessageReq {} type BatchSendMessageResp {} service message-job-api { @handler batchSendMessageHandler // 批量发送短信 post batchSendMessage(BatchSendMessageReq) returns(BatchSendMessageResp) }

因为不需要使用路由,所以handler下的routes.go被我删除了,在handler下新建了一个jobRun.go,内容如下:

package handler import ( "fishtwo/lib/xgo" "fishtwo/app/jobs/message/internal/svc" ) /** * @Description 启动job * @Author Mikael * @Date 2021/1/18 12:05 * @Version 1.0 **/ func JobRun(serverCtx *svc.ServiceContext) { xgo.Go(func() { batchSendMessageHandler(serverCtx) //...many job }) }

其实xgo.Go就是 go batchSendMessageHandler(serverCtx) ,封装了一下go携程,防止野生goroutine panic

然后修改一下启动文件message-job.go

package main import ( "flag" "fmt" "fishtwo/app/jobs/message/internal/config" "fishtwo/app/jobs/message/internal/handler" "fishtwo/app/jobs/message/internal/svc" "github.com/tal-tech/go-zero/core/conf" "github.com/tal-tech/go-zero/rest" ) var configFile = flag.String("f" "etc/message-job-api.yaml" "the config file") func main() { flag.Parse() var c config.Config conf.MustLoad(*configFile &c) ctx := svc.NewServiceContext(c) server := rest.MustNewServer(c.RestConf) defer server.Stop() handler.JobRun(ctx) fmt.Printf("Starting server at %s:%d...\n" c.Host c.Port) server.Start() }

主要是handler.RegisterHandlers(server ctx) 修改为handler.JobRun(ctx)

接下来,我们就可以引入dq了,首先在etc/xxx.yaml下添加dqConf

..... DqConf: Beanstalks: - Endpoint: 127.0.0.1:7771 Tube: tube1 - Endpoint: 127.0.0.1:7772 Tube: tube2 Redis: Host: 127.0.0.1:6379 Type: node

我这里本地用不同端口,模拟开了2个节点,7771、7772

在internal/config/config.go添加配置解析对象

type Config struct { .... DqConf dq.DqConf }

修改handler/batchsendmessagehandler.go

package handler import ( "context" "fishtwo/app/jobs/message/internal/logic" "fishtwo/app/jobs/message/internal/svc" "github.com/tal-tech/go-zero/core/logx" ) func batchSendMessageHandler(ctx *svc.ServiceContext){ rootCxt:= context.Background() l := logic.NewBatchSendMessageLogic(context.Background() ctx) err := l.BatchSendMessage() if err != nil{ logx.WithContext(rootCxt).Error("【JOB-ERR】 : % v " err) } }

修改logic下batchsendmessagelogic.go,写我们的consumer消费逻辑

package logic import ( "context" "fishtwo/app/jobs/message/internal/svc" "fmt" "github.com/tal-tech/go-zero/core/logx" ) type BatchSendMessageLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } func NewBatchSendMessageLogic(ctx context.Context svcCtx *svc.ServiceContext) BatchSendMessageLogic { return BatchSendMessageLogic{ Logger: logx.WithContext(ctx) ctx: ctx svcCtx: svcCtx } } func (l *BatchSendMessageLogic) BatchSendMessage() error { fmt.Println("job BatchSendMessage start") l.svcCtx.Consumer.Consume(func(body []byte) { fmt.Printf("job BatchSendMessage %s \n" string(body)) }) fmt.Printf("job BatchSendMessage finish \n") return nil }

这样就大功告成了,启动message-job.go就ok课

go run message-job.go

之后我们就可以在业务代码中向dq添加任务,它就可以自动消费了

producer.Delay 向dq中投递5个延迟任务:

producer := dq.NewProducer([]dq.Beanstalk{ { Endpoint: "localhost:7771" Tube: "tube1" } { Endpoint: "localhost:7772" Tube: "tube2" } }) for i := 1000; i < 1005; i { _ err := producer.Delay([]byte(strconv.Itoa(i)) time.Second * 1) if err != nil { fmt.Println(err) } }

producer.At 可以指定某个时间执行,非常好用,感兴趣的朋友自己可以研究下。

错误日志

在前面说到gateway改造时候,如果眼神好的童鞋,在上面的httpresult.go中已经看到了log的身影:

微服务架构最佳实践(微服务实践之分布式定时任务)(1)

我们在来看下rpc中怎么处理的

微服务架构最佳实践(微服务实践之分布式定时任务)(2)

是的,我在每个rpc启动的main中加入了grpc拦截器 https://www.yuque.com/tal-tech/go-zero/ttzlo1,那让我们看看grpc拦截器里面做了什么

微服务架构最佳实践(微服务实践之分布式定时任务)(3)

然后我代码里面使用github/pkg/errors这个包去处理错误的 这个包还是很好用的

微服务架构最佳实践(微服务实践之分布式定时任务)(4)

微服务架构最佳实践(微服务实践之分布式定时任务)(5)

所以呢:

我们在 grpc 中打印日志 logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】 % v" err)

api 中打印日志 logx.WithContext(r.Context()).Error("【GATEWAY-SRV-ERR】 : % v " err)

go-zero 中打印日志,使用logx.WithContext会把trace-id带入,这样一个请求下来,比如

user-api --> user-srv --> message-srv

那如果 messsage-srv 出错,他们三个是同一个 trace-id ,是不是就可以在elk通过输入这个trace-id一次性搜索出来这条请求报错堆栈信息呢?当然你也可以接入 jaeger、zipkin、skywalking 等,这个我暂时还没接入。

框架地址

https://github.com/tal-tech/go-zero

欢迎使用 go-zero 并 star 支持我们!

go-zero 系列文章见『微服务实践』公众号

猜您喜欢: