GoLang日志监控系统实现

 更新时间:2022年12月15日 10:08:21   作者:上后左爱  
这篇文章主要介绍了GoLang日志监控系统的实现流程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧

日志监控系统

Nginx(日志文件) -> log_process (实时读取解析写入) -> influxdb(存储) ->grafana(前端日志展示器)

influxdb 属于GO语言编写的开源的时序型数据,着力于高性能 查询与存储时序型数据,influxdb 广泛的应用于存储系统的监控数据,IOT行业的实时数据。

目前市面上流行 TSDB(时序型处理数据库):influxDB, TimescaleDB, QuestDBinfluxDB 类似于NOSQL体验,自动适合标记集模型的技术的数据集;TimescaleDB 与 postgreSQL 兼容, 更加适合物联网数据,与PostgreSQL更好的兼容QuestDB: 支持InfluxDB内联协议和PostgreSQL, 但是生态问题比较大

项目简答介绍

本日志系统 DEMO,但是可以直接使用到生产环境上面,使用LOG_Process 读取Nginx ./Access.log, 使用influxDB 进行存取

log_process -path ./access.log influxdsn http://127.0.0.1:8086@imooc@imoocpass@immoc@s

常见并发模型

  • 解决C10k 的问题 采用异步非阻塞的模型(Nginx, libevent, NodeJS)-- 问题 复杂度高 大量回调函数
  • 协程(Go,Erlang, lua): 协线性函数一样写代码;理解根加轻量级别的线程
  • 程序并行执行 go foo() // 执行函数
  • mgs:= <- c 多个gorountine 需要进行通信
  • select 从多个channel 中读取数据 ,多个 channel 随机选择一个进行消费 
  • 并发: 一个任务通过调度器让任务看起来运行 属于单核CPU(逻辑运行)对于IO密集型比较友好
  • 并行:任务真正的运行

在go 语言中 并发执行 ,使用三个不同 gorountine, 一个负责装填,一个负责运输,一个负责处理 ,让程序并发的运行起来,让任务更加的职责单一化 这种思想 也可以将 日志解析读取,写入模块进行单独小模块,每个模块让使用gorountine ,通过channel 数据交互,至于这么多gorountine 是在一个CPU调度执行还是分配到多个CPU上进行执行 ,取决于系统.

go 语言有自己的调度器, go fun() 属于一个独立的工作单元,go的调度器,根据每个可用的物理处理器分配一个逻辑处理器,通过这个逻辑处理器对 独立单元进行处理,
通过设置: runtime.GOMAXPROCS(1)//给调度器分配多小个具体的逻辑处理器
一台服务器的 物理处理器越多 ,go 获取到逻辑处理器也越多,导致器允许速度越快。 参考:传送门

系统架构

日志解析的基本流程化的伪函数,如下的函数有两个缺陷,解析介入和解析后输出只能写死,所以需要进行扩展,接口方式进行扩展

package main
import (
	"fmt"
	"strings"
	"time"
)
/**
* 日志解析系统分为: 解析,读取,写入
 */
type LogProcess struct {
	path        string      // 读取文件路径
	influxDBDsn string      // influx data source
	rc          chan string // read module to process
	wc          chan string // process to influx
}
// 返回函数使用 指针, 结构体很大 不需要进行拷贝 性能优化
func (l *LogProcess) ReadFromFile() {
	// 文件读取模块
	line := "message"
	l.rc <- line
}
func (l *LogProcess) Process() {
	// 文件解析模块
	data := <-l.rc
	l.wc <- strings.ToUpper(data)
}
func (l *LogProcess) writeToInfluxDB() {
	fmt.Println(<-l.wc)
}
func main() {
	// lp 引用类型
	lp := &LogProcess{
		path:        "./tmp/access.log",
		influxDBDsn: "username&password...",
		rc:          make(chan string),
		wc:          make(chan string),
	}
	// tree goroutine run
	go lp.ReadFromFile()
	go lp.Process()
	// 需要定义 chan 将 Process 数据 传递给 influxDB
	go lp.writeToInfluxDB()
	time.Sleep(2 * time.Second)
}

接口方式约束 输入和输出 进行优化

package main
import (
	"fmt"
	"strings"
	"time"
)
/**
* 日志解析系统分为: 解析,读取,写入
 */
type LogProcess struct {
	rc    chan string // read module to process
	wc    chan string // process to influx
	read  Read
	write Writer
}
func (l *LogProcess) Process() {
	// 文件解析模块
	data := <-l.rc
	l.wc <- strings.ToUpper(data)
}
type Writer interface {
	writer(wc chan string)
}
type WriteToInfluxDB struct {
	influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
	fmt.Println(<-wc)
}
type Read interface {
	read(rc chan string)
}
type ReadFromFile struct {
	path string // 读取文件
}
func (r *ReadFromFile) read(rc chan string) {
	// 读取模块
	line := "message"
	rc <- line
}
func main() {
	// lp 引用类型
	r := &ReadFromFile{
		path: "./tmp/access.log",
	}
	w := &WriteToInfluxDB{
		influxDBDsn: "username&password"}
	lp := &LogProcess{
		rc:    make(chan string),
		wc:    make(chan string),
		read:  r,
		write: w,
	}
	// 通过接口方式 约束其功能
	go lp.read.read(lp.rc)
	go lp.Process()
	go lp.write.writer(lp.wc)
	// 通过参数注入方式
	time.Sleep(2 * time.Second)
}

读取模块具体实现

从上次读取光标后开始逐行进行读取,无需每次都全部文件读取

package main
import (
	"bufio"
	"fmt"
	"io"
	"os"
	"strings"
	"time"
)
/**
* 日志解析系统分为: 解析,读取,写入
 */
type LogProcess struct {
	rc    chan []byte // read module to process
	wc    chan string // process to influx
	read  Read
	write Writer
}
func (l *LogProcess) Process() {
	// 文件解析模块
	for v := range l.rc {
		l.wc <- strings.ToUpper(string(v))
	}
}
type Writer interface {
	writer(wc chan string)
}
type WriteToInfluxDB struct {
	influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
	// wc 通道另外一种读取方式
	for x := range wc {
		fmt.Println(x)
	}
}
type Read interface {
	read(rc chan []byte)
}
type ReadFromFile struct {
	path string // 读取文件
}
func (r *ReadFromFile) read(rc chan []byte) {
	// 实时系统: 从文件末尾逐行进行读取
	f, err := os.Open(r.path)
	if err != nil {
		panic(fmt.Sprintln("open file error:%s", err.Error()))
	}
	// 文件末尾最开始进行读取
	f.Seek(0, 2)
	rd := bufio.NewReader(f)
	for {
		line, err := rd.ReadBytes('\n')
		if err == io.EOF {
			// d读取到文件末尾, 日志还没有写入
			time.Sleep(500 * time.Millisecond)
			continue
		} else if err != nil {
			panic(fmt.Sprintln("ReadBytes error:%s", err.Error()))
		}
		rc <- line[:len(line)-1]
	}
}
func main() {
	// lp 引用类型
	r := &ReadFromFile{
		path: "H:\\code\\goprogarm\\src\\access.log",
	}
	w := &WriteToInfluxDB{
		influxDBDsn: "username&password"}
	lp := &LogProcess{
		rc:    make(chan []byte),
		wc:    make(chan string),
		read:  r,
		write: w,
	}
	// 通过接口方式 约束其功能
	go lp.read.read(lp.rc)
	go lp.Process()
	go lp.write.writer(lp.wc)
	// 通过参数注入方式
	time.Sleep(100 * time.Second)
}

日志解析模块

  • 冲Read Chan 中读取每一行数据
  • 正则方式提取所需要的监控数据
  • 将数据写入到influxDB
package main
import (
	"bufio"
	"fmt"
	"io"
	"log"
	"os"
	"regexp"
	"strconv"
	"time"
)
/**
* 日志解析系统分为: 解析,读取,写入
 */
type LogProcess struct {
	rc    chan []byte // read module to process
	wc    chan *Message // process to influx
	read  Read
	write Writer
}
//日志写入结构体
type Message struct {
	TimeLocal time.Time
	BytesSent int
	Path, Method, Scheme, Status string
	UpstreamTime, RequestTime float64
}
func (l *LogProcess) Process() {
	// 通过正则表达式进行解析数据
	r := regexp.MustCompile(`(\s*)`)
	loc, _ := time.LoadLocation("Asia/shanghai")
	// 文件解析模块
	for v := range l.rc {
		ret := r.FindStringSubmatch(string(v))
		if len(ret) != 13 {
			log.Println("FindStringSub match fail:", string(v))
			continue
		}
		message := &Message{
		}
		location, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
		if err != nil {
			log.Println("ParseInLocation fail:", err.Error(), ret[4])
		}
		message.TimeLocal = location
		// 字符串类型转换成int
		atoi, err := strconv.Atoi(ret[8])
		if err != nil {
			log.Println("strconv.Atoi fail:", err.Error(), ret[4])
		}
		message.BytesSent = atoi
		l.wc <- message
	}
}
type Writer interface {
	writer(wc chan *Message)
}
type WriteToInfluxDB struct {
	influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan *Message) {
	// wc 通道另外一种读取方式
	for x := range wc {
		fmt.Println(x)
	}
}
type Read interface {
	read(rc chan []byte)
}
type ReadFromFile struct {
	path string // 读取文件
}
func (r *ReadFromFile) read(rc chan []byte) {
	// 实时系统: 从文件末尾逐行进行读取
	f, err := os.Open(r.path)
	if err != nil {
		panic(fmt.Sprintf("open file error:%s\n", err.Error()))
	}
	// 文件末尾最开始进行读取
	f.Seek(0, 2)
	rd := bufio.NewReader(f)
	for {
		line, err := rd.ReadBytes('\n')
		if err == io.EOF {
			// d读取到文件末尾, 日志还没有写入
			time.Sleep(500 * time.Millisecond)
			continue
		} else if err != nil {
			panic(fmt.Sprintf("ReadBytes error:%s\n", err.Error()))
		}
		rc <- line[:len(line)-1]
	}
}
func main() {
	// lp 引用类型
	r := &ReadFromFile{
		path: "H:\\code\\goprogarm\\src\\access.log",
	}
	w := &WriteToInfluxDB{
		influxDBDsn: "username&password"}
	lp := &LogProcess{
		rc:    make(chan []byte),
		wc:    make(chan *Message),
		read:  r,
		write: w,
	}
	// 通过接口方式 约束其功能
	go lp.read.read(lp.rc)
	go lp.Process()
	go lp.write.writer(lp.wc)
	// 通过参数注入方式
	time.Sleep(100 * time.Second)
}

到此这篇关于GoLang日志监控系统实现的文章就介绍到这了,更多相关GoLang日志监控内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • go语言实现通过FTP库自动上传web日志

    go语言实现通过FTP库自动上传web日志

    这篇文章主要介绍了go语言实现通过FTP库自动上传web日志,非常简单实用,需要的小伙伴快来参考下吧。
    2015-03-03
  • Go 自定义package包设置与导入操作

    Go 自定义package包设置与导入操作

    这篇文章主要介绍了Go 自定义package包设置与导入操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-05-05
  • go如何使用gin结合jwt做登录功能简单示例

    go如何使用gin结合jwt做登录功能简单示例

    jwt全称Json web token,是一种认证和信息交流的工具,这篇文章主要给大家介绍了关于go如何使用gin结合jwt做登录功能的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-01-01
  • Go语言七篇入门教程七GC垃圾回收三色标记

    Go语言七篇入门教程七GC垃圾回收三色标记

    这篇文章主要为大家介绍了Go语言教程关于GC垃圾回收三色标记的示例详解,本篇文章是Go语言七篇入门教程系列文章,有需要的朋友可以借鉴参考下,希望能够有所帮助
    2021-11-11
  • Golang学习笔记(二):类型、变量、常量

    Golang学习笔记(二):类型、变量、常量

    这篇文章主要介绍了Golang学习笔记(二):类型、变量、常量,本文讲解了基本类型、保留字、变量、常量、枚举、运算符、指针、分组声明等内容,需要的朋友可以参考下
    2015-05-05
  • Go语言实现机器大小端判断代码分享

    Go语言实现机器大小端判断代码分享

    这篇文章主要介绍了Go语言实现机器大小端判断代码分享,本文直接给出实现代码,需要的朋友可以参考下
    2014-10-10
  • 一文掌握Go语言并发编程必备的Mutex互斥锁

    一文掌握Go语言并发编程必备的Mutex互斥锁

    Go 语言提供了 sync 包,其中包括 Mutex 互斥锁、RWMutex 读写锁等同步机制,本篇博客将着重介绍 Mutex 互斥锁的基本原理,需要的可以参考一下
    2023-04-04
  • Go语言编译程序从后台运行,不出现dos窗口的操作

    Go语言编译程序从后台运行,不出现dos窗口的操作

    这篇文章主要介绍了Go语言编译程序从后台运行,不出现dos窗口的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-04-04
  • Golang中Set类型的实现方法示例详解

    Golang中Set类型的实现方法示例详解

    这篇文章主要给大家介绍了关于Golang中Set类型实现的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
    2017-09-09
  • Go Gin实现文件上传下载的示例代码

    Go Gin实现文件上传下载的示例代码

    这篇文章主要介绍了Go Gin实现文件上传下载的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-04-04

最新评论