首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >golang源码分析:goreplay

golang源码分析:goreplay

作者头像
golangLeetcode
发布2022-12-17 16:32:59
发布2022-12-17 16:32:59
5250
举报

在使用goreplay完成流量录制到es后( mac 上学习k8s系列(52)goreplay流量录制),总有一个疑问,它的具体实现原理是什么?它那么多参数,我应该如何组合使用呢?当一件事觉得很难理解的时候,那就撸下源码,始终要相信任何事物都有一个极其简单的内核。我们还是从main函数开始,代码位于:gor.go,核心的就下面几行

代码语言:javascript
复制
  func main() 
    plugins = NewPlugins()
    emitter := NewEmitter()
    go emitter.Start(plugins, Settings.Middleware)

1,定义插件,包括输入监听处理的插件和输出发送的插件。

2,定义事件分发器

3,开始根据插件和middle分发流量。

so easy 有没有?接着我们看插件的定义,它的代码位于:plugins.go,这里注册了很多常用的插件InputRAW,InputTCP,OutputTCP,InputFile,InputHTTP,OutputHTTP,NewKafkaOutput,NewKafkaInput,如果我们希望自定义,也可以在这里注册

代码语言:javascript
复制
func NewPlugins() *InOutPlugins {
      for _, options := range Settings.InputDummy {
         plugins.registerPlugin(NewDummyInput, options)
         
      for range Settings.OutputDummy {
        plugins.registerPlugin(NewDummyOutput)
    
    if Settings.OutputStdout {
      plugins.registerPlugin(NewDummyOutput)
    
  for _, options := range Settings.InputRAW {
    plugins.registerPlugin(NewRAWInput, options, Settings.InputRAWConfig)
      
   for _, options := range Settings.InputTCP {
     plugins.registerPlugin(NewTCPInput, options, &Settings.InputTCPConfig)
    
   for _, options := range Settings.OutputTCP {
    plugins.registerPlugin(NewTCPOutput, options, &Settings.OutputTCPConfig)
   
   for _, options := range Settings.InputFile {
     plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileMaxWait, Settings.InputFileDryRun)
   
   for _, options := range Settings.InputHTTP {
    plugins.registerPlugin(NewHTTPInput, options)
    
   for _, options := range Settings.OutputHTTP {
    plugins.registerPlugin(NewHTTPOutput, options, &Settings.OutputHTTPConfig)
    
    if Settings.OutputKafkaConfig.Host != "" && Settings.OutputKafkaConfig.Topic != "" {
      plugins.registerPlugin(NewKafkaOutput, "", &Settings.OutputKafkaConfig, &Settings.KafkaTLSConfig)
    
    if Settings.InputKafkaConfig.Host != "" && Settings.InputKafkaConfig.Topic != "" {
    plugins.registerPlugin(NewKafkaInput, "", &Settings.InputKafkaConfig, &Settings.KafkaTLSConfig)

下面具体看下插件注册的实现:

代码语言:javascript
复制
func (plugins *InOutPlugins) registerPlugin(constructor interface{}, options ...interface{}) {
    vc := reflect.ValueOf(constructor)
    plugin := vc.Call(vo)[0].Interface()
      
   if r, ok := plugin.(PluginReader); ok {
    plugins.Inputs = append(plugins.Inputs, r)
   
   if w, ok := plugin.(PluginWriter); ok {
    plugins.Outputs = append(plugins.Outputs, w)
    
    plugins.All = append(plugins.All, plugin)

它先通过反射拿到插件的interface,然后判断interface是否实现了PluginReader 和PluginWriter 接口,将插件分成输入插件和输出插件两类存储在数组里。

代码语言:javascript
复制
type PluginReader interface {
  PluginRead() (msg *Message, err error)
}
代码语言:javascript
复制
type PluginWriter interface {
  PluginWrite(msg *Message) (n int, err error)
}

接着看流量转发的实现。代码位于emitter.go:

代码语言:javascript
复制
func (e *Emitter) Start(plugins *InOutPlugins, middlewareCmd string) {
    middleware := NewMiddleware(middlewareCmd)
    if err := CopyMulty(middleware, plugins.Outputs...)
        for _, in := range plugins.Inputs {
      e.Add(1)
      go func(in PluginReader) {
        defer e.Done()
         if err := CopyMulty(in, plugins.Outputs...); err != nil {

它的核心逻辑是将所有的InputPlugin的数据经过middleware处理后转发给所有的OutputPlugin,转发函数是通过CopyMulty 实现的:

代码语言:javascript
复制
func CopyMulty(src PluginReader, writers ...PluginWriter) error {
      for {
       msg, err := src.PluginRead()
       msg.Data = modifier.Rewrite(msg.Data)
       dst.PluginWrite(msg)      
       for _, dst := range writers {
          if _, err := dst.PluginWrite(msg)

原理也很简单,读取src.PluginRead()的数据,写入dst.PluginWrite(msg)

以上就是goreplay的核心框架源码,核心原理如下图:

对于命令行选项参数定义在settings.go的init函数中,比如 --input-raw

的参数定义:

代码语言:javascript
复制
type AppSettings struct 
  func init() {
    flag.Var(&MultiOption{&Settings.InputRAW}, 

接着我们通过一些常用的插件分析下其实现原理:

1,inputHttp:

它的源码位于input_http.go

代码语言:javascript
复制
  func NewHTTPInput(address string) (i *HTTPInput) {
    i.listen(address)
代码语言:javascript
复制
func (i *HTTPInput) listen(address string) {
    mux.HandleFunc("/", i.handler)
    i.listener, err = net.Listen("tcp", address)
    err = http.Serve(i.listener, mux)
代码语言:javascript
复制
func (i *HTTPInput) handler(w http.ResponseWriter, r *http.Request) {
    buf, _ := httputil.DumpRequestOut(r, true)
    i.data <- buf  
代码语言:javascript
复制
func (i *HTTPInput) PluginRead() (*Message, error) {
    case buf := <-i.data:
      msg.Data = buf
      msg.Meta = payloadHeader(RequestPayload, uuid(), time.Now().UnixNano(), -1)

可以看到,它就是一个简单的http服务器,它的handler只做了一件事情,就是将http请求通过 httputil.DumpRequestOut,序列化到buf里面,然后写入chanel data里面,PluginRead 接口就是负责消费data这个chan里面的数据,是不是贼简单?

2,outputHttp:

代码位于output_http.go:

代码语言:javascript
复制
func NewHTTPOutput(address string, config *HTTPOutputConfig) PluginReadWriter 
    if o.config.ElasticSearch != "" {
      o.elasticSearch = new(ESPlugin)
      o.elasticSearch.Init(o.config.ElasticSearch)
      o.client = NewHTTPClient(o.config)
      
      for i := 0; i < o.config.WorkersMin; i++ {
          go o.startWorker()
    
      go o.workerMaster()

httpOutput还支持写入es,自己实现es插件需要实现Init接口;我们还是从PluginWrite接口入手:

代码语言:javascript
复制
func (o *HTTPOutput) PluginWrite(msg *Message) (n int, err error) {
      select {
        case <-o.stop:
          return 0, ErrorStopped
        case o.queue <- msg:
      
    if len(o.queue) > 0 {
        if atomic.LoadInt32(&o.activeWorkers) < int32(o.config.WorkersMax) {
            go o.startWorker()
            atomic.AddInt32(&o.activeWorkers, 1)

它将msg放入queue里面,然后实现了一个协程池,启动woker来处理输出。worker,则是不断从queue里面拿消息向外发送请求

代码语言:javascript
复制
func (o *HTTPOutput) startWorker()
    case msg := <-o.queue:
      o.sendRequest(o.client, msg)

workerMaster则是不断销毁已经处理完请求的woker协程

代码语言:javascript
复制
func (o *HTTPOutput) workerMaster()
  if atomic.LoadInt32(&o.activeWorkers) > int32(o.config.WorkersMin) && len(o.queue) < 1 {
      // close one worker
      o.stopWorker <- struct{}{}
      atomic.AddInt32(&o.activeWorkers, -1)

发送请求自然很简单了,就是一个http client 请求而已

代码语言:javascript
复制
func (o *HTTPOutput) sendRequest(client *HTTPClient, msg *Message) {
    resp, err := client.Send(msg.Data)
      if o.elasticSearch != nil {
          o.elasticSearch.ResponseAnalyze(msg.Data, resp, start, stop)

如果输出需要存入es,则调用es插件的ResponseAnalyze 方法进行数据写入。mac 上学习k8s系列(52)goreplay流量录制 中实现的es.go插件,主要就是实现了下面两个接口。

代码语言:javascript
复制
 func (p *ESPlugin) Init(URI string) {
 func (p *ESPlugin) ResponseAnalyze(req, resp []byte, start, stop time.Time)

3,inputKafka:

input_kafka.go它其实就是实现了一个kafka的消费者

代码语言:javascript
复制
func NewKafkaInput(_ string, config *InputKafkaConfig, tlsConfig *KafkaTLSConfig) *KafkaInput {
    con, err = sarama.NewConsumer(strings.Split(config.Host, ","), c)
        go func(consumer sarama.PartitionConsumer) {
      defer consumer.Close()

      for message := range consumer.Messages() {
        i.messages <- message
      }
    }(consumer)
代码语言:javascript
复制
func (i *KafkaInput) PluginRead() (*Message, error) {
    case message = <-i.messages:

4,inputTcp:

input_tcp.go 和http一样,仅仅完成了tcp请求的监听:

代码语言:javascript
复制
func (i *TCPInput) listen(address string) {
    listener, err := tls.Listen("tcp", address, config)
    listener, err := net.Listen("tcp", address)
      go func() {
        for {
          conn, err := i.listener.Accept()
          if err == nil {
            go i.handleConnection(conn)

5,inputRaw:

input_raw.go这是所有插件里面唯一有点技术含量的东东,也是我们最常用的无侵入流量录制用到的插件:

代码语言:javascript
复制
func NewRAWInput(address string, config RAWInputConfig) (i *RAWInput) {
    host, _ports, err := net.SplitHostPort(address)
    i.listen(address)
代码语言:javascript
复制
func (i *RAWInput) PluginRead() (*Message, error) {
      case msgTCP = <-i.listener.Messages():
       msg.Data = msgTCP.Data()
        if msgTCP.Direction == tcp.DirIncoming {

它实现了自己的listen方法:

代码语言:javascript
复制
func (i *RAWInput) listen(address string) {
    i.listener, err = capture.NewListener(i.host, i.ports, i.config)
      errCh := i.listener.ListenBackground(ctx)
      <-i.listener.Reading

底层调用了 capture包的NewListener方法,获得监听器。

capture/capture.go

代码语言:javascript
复制
func NewListener(host string, ports []uint16, config PcapOptions) (l *Listener, err error) {
    l = &Listener{}
  switch config.Engine {
  default:
    l.Activate = l.activatePcap
  case EngineRawSocket:
    l.Activate = l.activateRawSocket
  case EngineAFPacket:
    l.Activate = l.activateAFPacket
  case EnginePcapFile:
    l.Activate = l.activatePcapFile
    return
  case EngineVXLAN:
    l.Activate = l.activateVxLanSocket
    err = l.setInterfaces()

它枚举所有可用的网络设备,然后筛选出我们关心的接口

代码语言:javascript
复制
func (l *Listener) setInterfaces() (err error) {
    pifis, err = pcap.FindAllDevs()
    ifis, _ := net.Interfaces()
    for _, pi := range pifis {
       l.Interfaces = append(l.Interfaces, pi)

引用了大名鼎鼎的"github.com/google/gopacket/pcap"包来做数据监听

代码语言:javascript
复制
func (l *Listener) Listen(ctx context.Context) (err error) {
    for key, handle := range l.Handles {
    go l.readHandle(key, handle)
      for key, handle := range l.Handles {
            if key == in.Name {
              fmt.Println("Activating capture on:", in.Name)
              go l.readHandle(key, handle)

然后根据需要来做数据解析

代码语言:javascript
复制
func (l *Listener) readHandle(key string, hndl packetHandle) {
    messageParser := tcp.NewMessageParser(l.messages, l.ports, hndl.ips, l.config.Expire, l.config.AllowIncomplete)
    data, ci, err := hndl.handler.ReadPacketData()
    messageParser.PacketHandler(&tcp.PcapPacket{
          Data:     data,
          LType:    linkType,
          LTypeLen: linkSize,
          Ci:       &ci,
        })

总结一下:goreplay其实类似一个交换机,它接收inputPlugin的流量,然后分发给outputPlugin,其中inputRaw插件实现了流量的透明录制。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-11-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档