Go语言编写高可用日志收集脚本

 更新时间:2025年09月10日 08:49:54   作者:程序员爱钓鱼  
在分布式系统和微服务架构中,日志是排查问题、审计行为、监控状态的重要来源,本文将编写一个轻量级,高可用的日志收集脚本,有需要的小伙伴可以了解下

在分布式系统和微服务架构中,日志是排查问题、审计行为、监控状态的重要来源。本篇作为《Go语言100个实战案例》中的一篇,带你从设计到实现,完整写出一个轻量级、高可用的日志收集脚本(Agent),能够实时采集多个本地日志文件、处理文件切割(rotation)、按批发送到远端聚合服务,并具备重试、限流和优雅停止能力。

目标与场景

目标:实现一个“可部署到每台机器上”的日志采集脚本(agent),功能包括:

  • 监控并 tail 多个指定日志文件(支持通配符)
  • 处理日志切割(rotation)场景(无需丢失数据)
  • 将日志按批次、JSON 格式发送到远端 HTTP 接收端(可替换为 Kafka/gRPC)
  • 支持并发、限流、指数退避重试和本地缓冲
  • 可优雅停止并保证数据尽可能送达

适用场景:小型到中型集群的轻量采集、调试环境、或作为自研日志管道的一部分。

技术选型(简要)

  • 语言:Go(并发模型天然适合)
  • 文件 tail:github.com/hpcloud/tail(成熟、支持 rotation)——也可用 fsnotify + 自实现 tail,但 hpcloud/tail 工具成熟、代码量少
  • 网络传输:HTTP POST + gzip + JSON(易于接入)
  • 配置:命令行 flags + 简单 JSON/YAML(本文用 flags)
  • 重试策略:指数退避(带上限)

注:示例使用 hpcloud/tail 来可靠处理文件 truncation/rotation,实际生产可替换为更复杂的 offset 存储(保证断点续传)

项目结构(示意)

log-agent/
├─ main.go
├─ sender.go
├─ tailer.go
├─ go.mod

下面直接给出一个 单文件(main.go)  的可运行示例,方便快速理解与使用。

完整代码(main.go)

// main.go
package main

import (
    "bufio"
    "bytes"
    "compress/gzip"
    "context"
    "encoding/json"
    "flag"
    "fmt"
    "io"
    "net/http"
    "os"
    "os/signal"
    "path/filepath"
    "sync"
    "syscall"
    "time"

    "github.com/hpcloud/tail"
)

// LogRecord 定义发送到服务器的 JSON 结构
type LogRecord struct {
    Timestamp time.Time `json:"timestamp"`
    Host      string    `json:"host"`
    Path      string    `json:"path"`
    Line      string    `json:"line"`
}

// Config
var (
    globPattern = flag.String("paths", "/var/log/*.log", "日志文件路径,支持通配符")
    endpoint    = flag.String("endpoint", "http://127.0.0.1:8080/ingest", "日志收集服务地址")
    batchSize   = flag.Int("batch", 200, "每次发送最大条数")
    batchWait   = flag.Duration("wait", 2*time.Second, "批量发送最大等待时间")
    workers     = flag.Int("workers", 4, "并发发送 worker 数")
    maxQueue    = flag.Int("queue", 2000, "本地队列最大条数,超出丢弃最老")
)

func main() {
    flag.Parse()

    host, _ := os.Hostname()

    paths, err := filepath.Glob(*globPattern)
    if err != nil {
        fmt.Fprintf(os.Stderr, "invalid pattern: %v\n", err)
        os.Exit(1)
    }
    if len(paths) == 0 {
        fmt.Fprintf(os.Stderr, "no logs matched pattern: %s\n", *globPattern)
        os.Exit(1)
    }

    // context for graceful shutdown
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // signal handling
    sigc := make(chan os.Signal, 1)
    signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigc
        fmt.Println("received shutdown signal, stopping...")
        cancel()
    }()

    // central channel for lines
    lineCh := make(chan LogRecord, *maxQueue)

    var wg sync.WaitGroup

    // start tailers
    for _, p := range paths {
        wg.Add(1)
        go func(path string) {
            defer wg.Done()
            if err := tailFile(ctx, path, host, lineCh); err != nil {
                fmt.Fprintf(os.Stderr, "tail %s error: %v\n", path, err)
            }
        }(p)
    }

    // start sender workers
    senderWg := &sync.WaitGroup{}
    for i := 0; i < *workers; i++ {
        senderWg.Add(1)
        go func(id int) {
            defer senderWg.Done()
            runSender(ctx, id, lineCh, *endpoint, *batchSize, *batchWait)
        }(i)
    }

    // wait for tailers to finish (on ctx cancel they will exit)
    wg.Wait()
    // close channel to signal senders to flush and exit
    close(lineCh)
    // wait for senders to finish
    senderWg.Wait()

    fmt.Println("agent stopped")
}

// tailFile 使用 hpcloud/tail 跟踪文件
func tailFile(ctx context.Context, path, host string, out chan<- LogRecord) error {
    cfg := tail.Config{
        Follow:    true,
        ReOpen:    true, // 支持日志切割后重新打开
        MustExist: false,
        Poll:      true,
        Logger:    tail.DiscardingLogger,
    }
    t, err := tail.TailFile(path, cfg)
    if err != nil {
        return err
    }
    defer t.Cleanup()

    for {
        select {
        case <-ctx.Done():
            t.Cleanup()
            return nil
        case line, ok := <-t.Lines:
            if !ok {
                // channel closed; end
                return nil
            }
            if line == nil {
                continue
            }
            rec := LogRecord{
                Timestamp: time.Now().UTC(),
                Host:      host,
                Path:      path,
                Line:      line.Text,
            }
            // non-blocking send to avoid blocking tail; drop oldest if full
            select {
            case out <- rec:
            default:
                // drop one and push new (simple policy)
                select {
                case <-out:
                default:
                }
                select {
                case out <- rec:
                default:
                    // give up if still full
                }
            }
        }
    }
}

// runSender 聚合并发送日志,带简单重试
func runSender(ctx context.Context, id int, in <-chan LogRecord, endpoint string, batchSize int, batchWait time.Duration) {
    httpClient := &http.Client{
        Timeout: 10 * time.Second,
    }
    buf := make([]LogRecord, 0, batchSize)

    sendBatch := func(batch []LogRecord) error {
        if len(batch) == 0 {
            return nil
        }
        // marshal
        data, err := json.Marshal(batch)
        if err != nil {
            return err
        }
        // gzip body
        var b bytes.Buffer
        gw := gzip.NewWriter(&b)
        if _, err := gw.Write(data); err != nil {
            _ = gw.Close()
            return err
        }
        _ = gw.Close()

        req, _ := http.NewRequest("POST", endpoint, &b)
        req.Header.Set("Content-Encoding", "gzip")
        req.Header.Set("Content-Type", "application/json")
        // retry with exponential backoff
        var attempt int
        for {
            attempt++
            resp, err := httpClient.Do(req)
            if err == nil {
                io.Copy(io.Discard, resp.Body)
                resp.Body.Close()
                if resp.StatusCode >= 200 && resp.StatusCode < 300 {
                    return nil
                }
                err = fmt.Errorf("bad status: %s", resp.Status)
            }
            // on ctx done, abort immediately
            select {
            case <-ctx.Done():
                return fmt.Errorf("context canceled")
            default:
            }
            if attempt >= 5 {
                return err
            }
            // backoff
            sleep := time.Duration(500*(1<<uint(attempt-1))) * time.Millisecond
            if sleep > 10*time.Second {
                sleep = 10 * time.Second
            }
            time.Sleep(sleep)
        }
    }

    timer := time.NewTimer(batchWait)
    defer timer.Stop()

    for {
        select {
        case <-ctx.Done():
            // flush remaining
            _ = sendBatch(buf)
            return
        case rec, ok := <-in:
            if !ok {
                // channel closed -> flush and exit
                _ = sendBatch(buf)
                return
            }
            buf = append(buf, rec)
            if len(buf) >= batchSize {
                _ = sendBatch(buf)
                buf = buf[:0]
                if !timer.Stop() {
                    select {
                    case <-timer.C:
                    default:
                    }
                }
                timer.Reset(batchWait)
            }
        case <-timer.C:
            if len(buf) > 0 {
                _ = sendBatch(buf)
                buf = buf[:0]
            }
            timer.Reset(batchWait)
        }
    }
}

使用方法

1.初始化模块并获取依赖:

go mod init example.com/log-agent
go get github.com/hpcloud/tail
go build -o log-agent main.go

2.运行(示例):

./log-agent -paths "/var/log/myapp/*.log" -endpoint "http://log-collector:8080/ingest" -batch 100 -workers 4

3.建议把 agent 用 systemd 管理或容器化部署为 DaemonSet(K8s)或 sidecar。

实践要点与注意事项

日志切割:使用 ReOpen: true 可处理 logrotate 产生的新文件句柄;生产环境建议结合 inode 校验与持久化 offset(例如把 offset 存到本地文件或 SQLite)以支持重启断点续传。

传输安全:生产环境使用 HTTPS + 鉴权(API Key / mTLS)来防止日志被窃取或篡改。

后端吞吐:发送端需要限流与批次控制,避免短时间内把流量拉爆目标端。也可以使用本地磁盘队列(如 diskqueue)在网络中断时持久化缓存。

结构化日志:尽量让应用输出结构化 JSON 日志,这样聚合与查询更强。若是 plain text,可在 agent 处做简单解析(regex)或转发原始行。

监控与自检:给 agent 加入心跳/metrics(Prometheus)接口,监控发送失败数、队列长度等关键指标。

日志隐私:注意日志中可能包含敏感数据(PII、密码、token),可在 agent 端进行脱敏或过滤再上报。

进一步改进(思路)

  • 使用持久化队列(disk-backed)保证断网或进程崩溃后不丢日志。
  • 支持多种传输后端:Kafka、gRPC、AWS S3、Elasticsearch 等。
  • 支持日志标签(service、env、pod)自动注入(从系统 / 环境变量获取)。
  • 增加插件化解析器(nginx、app custom parser)做字段抽取。
  • 通过 Web UI 或配置中心动态下发采集规则。

总结

这篇文章展示了如何用 Go 快速实现一个可靠、可扩展的日志收集脚本:从文件采集、切割处理,到批量发送与重试策略,都给出了实际可运行的示例代码。实现中充分利用了 Go 的并发、channel 与 context,代码简洁、易扩展。把这个 agent 打包部署在每台节点上,就能为后端日志聚合系统提供稳定可靠的数据源。

到此这篇关于Go语言编写高可用日志收集脚本的文章就介绍到这了,更多相关Go日志收集内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 详解Go语言如何实现一个最简化的协程池

    详解Go语言如何实现一个最简化的协程池

    这篇文章主要为大家详细介绍了Go语言如何实现一个最简化的协程池,文中的示例代码讲解详细,具有一定的参考价值,有需要的小伙伴可以了解一下
    2023-10-10
  • Golang记录、计算函数执行耗时、运行时间的一个简单方法

    Golang记录、计算函数执行耗时、运行时间的一个简单方法

    这篇文章主要介绍了Golang记录、计算函数执行耗时、运行时间的一个简单方法,本文直接给出代码实例,需要的朋友可以参考下
    2015-07-07
  • Go语言基础结构体用法及示例详解

    Go语言基础结构体用法及示例详解

    这篇文章主要为大家介绍了Go语言基础结构体的用法及示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步
    2021-11-11
  • 关于Go语言中的IO操作详解

    关于Go语言中的IO操作详解

    在现代软件开发中,高效的输入输出(I/O)操作是提高程序性能的关键之一,Go语言提供了丰富的I/O操作接口,使得文件读写、网络通信等任务变得简单而高效,本文介绍了关于Go语言中的IO操作,需要的朋友可以参考下
    2024-10-10
  • 详解Go语言中io/ioutil工具的使用

    详解Go语言中io/ioutil工具的使用

    这篇文章主要为大家详细介绍了Go语言中io/ioutil工具的使用,从而简化文件操作。文中是示例代码讲解详细,感兴趣的小伙伴可以了解一下
    2022-05-05
  • golang redigo发布订阅使用的方法

    golang redigo发布订阅使用的方法

    本文主要介绍了golang redigo发布订阅使用的方法,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-10-10
  • 利用golang进行OpenCV学习和开发的步骤

    利用golang进行OpenCV学习和开发的步骤

    目前,OpenCV逐步成为一个通用的基础研究和产品开发平台,下面这篇文章主要给大家介绍了关于利用golang进行OpenCV学习和开发的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2018-09-09
  • 基于Go语言实现猜谜游戏

    基于Go语言实现猜谜游戏

    这篇文章主要为大家详细介绍了如何基于Go语言实现猜谜游戏,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习
    2023-09-09
  • golang chan传递数据的性能开销详解

    golang chan传递数据的性能开销详解

    这篇文章主要为大家详细介绍了Golang中chan在接收和发送数据时因为“复制”而产生的开销,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下
    2024-01-01
  • golang格式化输出函数printf、sprintf、fprintf解读

    golang格式化输出函数printf、sprintf、fprintf解读

    这篇文章主要介绍了golang格式化输出函数printf、sprintf、fprintf,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2025-07-07

最新评论