快捷搜索:  汽车  科技

dubbo如何实现节点管理,DubboZipkinBrave实现全链路追踪

dubbo如何实现节点管理,DubboZipkinBrave实现全链路追踪核心源码wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec' java -jar zipkin.jar 输入 http://localhost:9411/zipkin/ 进入WebUI界面如下 重点主要是zipkin及brave使用及特性 当前brave版本为 5.2.0 为 2018年8月份发布的release版本 zipkin版本为2.2.1 所需JDK为1.8快速启动zipkin下载最新的zipkin并启动


最近写了一个链路追踪Demo分享下,实现了链路追踪过程中数据的记录,还有能扩展的地方,后期再继续补充。

源码地址

实现链路追踪的目的

  • 服务调用的流程信息,定位服务调用链
  • 记录调用入参及返回值信息,方便问题重现
  • 记录调用时间线,代码重构及调优处理
  • 调用信息统计

分布式跟踪系统还有其他比较成熟的实现,例如:Naver的Pinpoint、Apache的HTrace、阿里的鹰眼Tracing、京东的Hydra、新浪的Watchman,美团点评的CAT,skywalking等。 本次主要利用dubbo数据传播特性扩展Filter接口来实现链路追踪的目的

重点主要是zipkin及brave使用及特性 当前brave版本为 5.2.0 为 2018年8月份发布的release版本 zipkin版本为2.2.1 所需JDK为1.8

快速启动zipkin

下载最新的zipkin并启动

wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec' java -jar zipkin.jar

输入 http://localhost:9411/zipkin/ 进入WebUI界面如下

dubbo如何实现节点管理,DubboZipkinBrave实现全链路追踪(1)


核心源码

代码的初步版本:方便描述

import brave.Span; import brave.Tracer; import brave.Tracing; import brave.propagation.*; import brave.sampler.Sampler; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.extension.Activate; import com.alibaba.dubbo.common.json.JSON; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; import com.alibaba.dubbo.remoting.exchange.ResponseCallback; import com.alibaba.dubbo.rpc.*; import com.alibaba.dubbo.rpc.protocol.dubbo.futureAdapter; import com.alibaba.dubbo.rpc.support.RpcUtils; import zipkin2.codec.SpanBytesEncoder; import zipkin2.reporter.AsyncReporter; import zipkin2.reporter.Sender; import zipkin2.reporter.okhttp3.OkHttpSender; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * Created with IntelliJ IDEA. * * @author: bakerZhu * @description: * @modifytime: */ @Activate(group = {Constants.PROVIDER Constants.CONSUMER}) public class TracingFilter implements Filter { private static final Logger log = LoggerFactory.getLogger(TracingFilter.class); private static Tracing tracing; private static Tracer tracer; private static TraceContext.Extractor<Map<String String>> extractor; private static TraceContext.Injector<Map<String String>> injector; static final Propagation.Getter<Map<String String> String> GETTER = new Propagation.Getter<Map<String String> String>() { @Override public String get(Map<String String> carrier String key) { return carrier.get(key); } @Override public String toString() { return "Map::get"; } }; static final Propagation.Setter<Map<String String> String> SETTER = new Propagation.Setter<Map<String String> String>() { @Override public void put(Map<String String> carrier String key String value) { carrier.put(key value); } @Override public String toString() { return "Map::set"; } }; static { // 1 Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans"); // 2 AsyncReporter asyncReporter = AsyncReporter.builder(sender) .closeTimeout(500 TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); // 3 tracing = Tracing.newBuilder() .localServiceName("tracer-client") .spanReporter(asyncReporter) .sampler(Sampler.ALWAYS_SAMPLE) .propagationFactory(ExtraFieldPropagation.newFactory(B3Propagation.FACTORY "user-name")) .build(); tracer = tracing.tracer(); // 4 // 4.1 extractor = tracing.propagation().extractor(GETTER); // 4.2 injector = tracing.propagation().injector(SETTER); } public TracingFilter() { } @Override public Result invoke(Invoker<?> invoker Invocation invocation) throws RpcException { RpcContext rpcContext = RpcContext.getContext(); // 5 Span.Kind kind = rpcContext.isProviderSide() ? Span.Kind.SERVER : Span.Kind.CLIENT; final Span span; if (kind.equals(Span.Kind.CLIENT)) { //6 span = tracer.nextSpan(); //7 injector.inject(span.context() invocation.getAttachments()); } else { //8 TraceContextOrSamplingFlags extracted = extractor.extract(invocation.getAttachments()); //9 span = extracted.context() != null ? tracer.joinSpan(extracted.context()) : tracer.nextSpan(extracted); } if (!span.isNoop()) { span.kind(kind).start(); //10 String service = invoker.getInterface().getSimpleName(); String method = RpcUtils.getMethodName(invocation); span.kind(kind); span.name(service "/" method); InetSocketAddress remoteAddress = rpcContext.getRemoteAddress(); span.remoteIpAndPort( remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : remoteAddress.getHostName() remoteAddress.getPort()); } boolean isOneway = false deferFinish = false; try (Tracer.SpanInScope scope = tracer.withSpanInScope(span)){ //11 collectArguments(invocation span kind); Result result = invoker.invoke(invocation); if (result.hasException()) { onError(result.getException() span); } // 12 isOneway = RpcUtils.isOneway(invoker.getUrl() invocation); // 13 Future<Object> future = rpcContext.getFuture(); if (future instanceof FutureAdapter) { deferFinish = true; ((FutureAdapter) future).getFuture().setCallback(new FinishSpanCallback(span));// 14 } return result; } catch (Error | RuntimeException e) { onError(e span); throw e; } finally { if (isOneway) { // 15 span.flush(); } else if (!deferFinish) { // 16 span.finish(); } } } static void onError(Throwable error Span span) { span.error(error); if (error instanceof RpcException) { span.tag("dubbo.error_msg" RpcExceptionEnum.getMsgByCode(((RpcException) error).getCode())); } } static void collectArguments(Invocation invocation Span span Span.Kind kind) { if (kind == Span.Kind.CLIENT) { StringBuilder fqcn = new StringBuilder(); Object[] args = invocation.getArguments(); if (args != null && args.length > 0) { try { fqcn.append(JSON.json(args)); } catch (IOException e) { log.warn(e.getMessage() e); } } span.tag("args" fqcn.toString()); } } static final class FinishSpanCallback implements ResponseCallback { final Span span; FinishSpanCallback(Span span) { this.span = span; } @Override public void done(Object response) { span.finish(); } @Override public void caught(Throwable exception) { onError(exception span); span.finish(); } } // 17 private enum RpcExceptionEnum { UNKNOWN_EXCEPTION(0 "unknown exception") NETWORK_EXCEPTION(1 "network exception") TIMEOUT_EXCEPTION(2 "timeout exception") BIZ_EXCEPTION(3 "biz exception") FORBIDDEN_EXCEPTION(4 "forbidden exception") SERIALIZATION_EXCEPTION(5 "serialization exception") ; private int code; private String msg; RpcExceptionEnum(int code String msg) { this.code = code; this.msg = msg; } public static String getMsgByCode(int code) { for (RpcExceptionEnum error : RpcExceptionEnum.values()) { if (code == error.code) { return error.msg; } } return null; } } }

  1. 构建客户端发送工具
  2. 构建异步reporter
  3. 构建tracing上下文
  4. 初始化injector 和 Extractor [tab]4.1 extractor 指数据提取对象 用于在carrier中提取TraceContext相关信息或者采样标记信息到TraceContextOrSamplingFlags 中 -4.2 injector 用于将TraceContext中的各种数据注入到carrier中 其中carrier一半是指数据传输中的载体 类似于Dubbo中Invocation中的attachment(附件集合)
  5. 判断此次调用是作为服务端还是客户端
  6. rpc客户端调用会从ThreadLocal中获取parent的 TraceContext 为新生成的Span指定traceId及 parentId如果没有parent traceContext 则生成的Span为 root span
  7. 将Span绑定的TraceContext中 属性信息 Copy 到 Invocation中达到远程参数传递的作用
  8. rpc服务提供端 从invocation中提取TraceContext相关信息及采样数据信息
  9. 生成span 兼容初次服务端调用
  10. 记录接口信息及远程IP Port
  11. 将创建的Span 作为当前Span (可以通过Tracer.currentSpan 访问到它) 并设置查询范围
  12. oneway调用即只请求不接受结果
  13. 如果future不为空则为 async 调用 在回调中finish span
  14. 设置异步回调,回调代码执行span finish() .
  15. oneway调用 因为不需等待返回值 即没有 cr (Client Receive) 需手动flush()
  16. 同步调用 业务代码执行完毕后需手动finish()
  17. 设置枚举类 与 Dubbo中RpcException保持对应

测试项

  • Dubbo sync async oneway 测试
  • RPC异常测试
  • 普通业务异常测试
  • 并发测试

配置方式

POM依赖添加

<dependency> <groupId>com.github.baker</groupId> <artifactId>Tracing</artifactId> <version>1.0-SNAPSHOT</version> </dependency>

资源目录根路径下添加tracing.properties文件

dubbo如何实现节点管理,DubboZipkinBrave实现全链路追踪(2)

一次调用信息

dubbo如何实现节点管理,DubboZipkinBrave实现全链路追踪(3)

调用链

dubbo如何实现节点管理,DubboZipkinBrave实现全链路追踪(4)

调用成功失败汇总

dubbo如何实现节点管理,DubboZipkinBrave实现全链路追踪(5)

zipkinHost 指定zipkin服务器IP:PORT 默认为localhost:9411 serviceName 指定应用名称 默认为trace-default

调用链:

dubbo如何实现节点管理,DubboZipkinBrave实现全链路追踪(6)

待扩展项

  • 抽象数据传输(扩展Kafka数据传输)
  • 调用返回值数据打印
  • 更灵活的配置方式

猜您喜欢: