Python 获取 datax 执行结果保存到数据库的方法

 更新时间:2019年07月11日 10:30:49   作者:薛定谔的DBA  
今天小编就为大家分享一篇Python 获取 datax 执行结果保存到数据库的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

执行 datax 作业,创建执行文件,在 crontab 中每天1点(下面有关系)执行:

其中 job_start 及 job_finish 这两行记录是自己添加的,为了方便识别出哪张表。

#!/bin/bash
source /etc/profile
user1="root"
pass1="pwd"
user2="root"
pass2="pwd"
job_path="/opt/datax/job/"
 
jobfile=(
job_table_a.json
job_table_b.json
)
 
for filename in ${jobfile[@]}
do
	echo "job_start: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}"
	python /opt/datax/bin/datax.py -p "-Duser1=${user1} -Dpass1=${pass1} -Duser2=${user2} -Dpass2=${pass2}" ${job_path}${filename}
	echo "job_finish: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}"
done
 
# 0 1 * * * /opt/datax/job/dc_to_ods_incr.sh >> /opt/datax/job/log/dc_to_ods_incr_$(date +\%Y\%m\%d_\%H\%M\%S).log 2>&1
# egrep '任务|速度|总数|job_start|job_finish' /opt/datax/job/log/

datax 执行日志:

job_start: 2018-08-08 01:13:28 job_table_a.json
任务启动时刻          : 2018-08-08 01:13:28
任务结束时刻          : 2018-08-08 01:14:49
任务总计耗时          :         81s
任务平均流量          :     192.82KB/s
记录写入速度          :      1998rec/s
读出记录总数          :       159916
读写失败总数          :          0
job_finish: 2018-08-08 01:14:49 job_table_a.json
job_start: 2018-08-08 01:14:49 job_table_b.json
任务启动时刻          : 2018-08-08 01:14:50
任务结束时刻          : 2018-08-08 01:15:01
任务总计耗时          :         11s
任务平均流量          :        0B/s
记录写入速度          :       0rec/s
读出记录总数          :          0
读写失败总数          :          0
job_finish: 2018-08-08 01:15:01 job_table_b.json

接下来读取这些信息保存到数据库,在数据库中创建表:

CREATE TABLE `datax_job_result` (
 `log_file` varchar(200) DEFAULT NULL,
 `job_file` varchar(200) DEFAULT NULL,
 `start_time` datetime DEFAULT NULL,
 `end_time` datetime DEFAULT NULL,
 `seconds` int(11) DEFAULT NULL,
 `traffic` varchar(50) DEFAULT NULL,
 `write_speed` varchar(50) DEFAULT NULL,
 `read_record` int(11) DEFAULT NULL,
 `failed_record` int(11) DEFAULT NULL,
 `job_start` varchar(200) DEFAULT NULL,
 `job_finish` varchar(200) DEFAULT NULL,
 `insert_time` datetime DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

定时执行以下文件,因为 datax 作业 1 点执行,为了获取一天内最新生产的日志,脚本中取 82800内生产的日志文件,及23 小时内生产的那个最新日志。所以一天内任何时间执行都可以。此文件也是定时每天执行(判断 datax 作业完成后执行)

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 0 5 * * * source /etc/profile && /usr/bin/python2.7 /opt/datax/job/save_log_to_db.py > /dev/null 2>&1
 
import re
import os
import sqlalchemy
import pandas as pd
import datetime as dt
 
def save_to_db(df):
	engine = sqlalchemy.create_engine("mysql+pymysql://root:pwd@localhost:3306/test", encoding="utf-8") 
	df.to_sql("datax_job_result", engine, index=False, if_exists='append') 
 
def get_the_latest_file(path):
	t0 = dt.datetime.utcfromtimestamp(0)
	d2 = (dt.datetime.now() - t0).total_seconds()
	d1 = d2 - 82800
	for (dirpath, dirnames, filenames) in os.walk(path):
		for filename in sorted(filenames, reverse = True):
			if filename.endswith(".log"):
				f = os.path.join(dirpath,filename)
				ctime = os.stat(f)[-1]
				if ctime>=d1 and ctime <=d2:
					return f
			
def get_job_result_from_logfile(path):
	result = pd.DataFrame(columns=['log_file','job_file','start_time','end_time','seconds','traffic','write_speed','read_record','failed_record','job_start','job_finish'])
	log_file = get_the_latest_file(path)
	index = 0
	content = open(log_file, "r")
	for line in content:
		result.loc[index, 'log_file'] = log_file
		if re.compile(r'job_start').match(line):
			result.loc[index, 'job_file'] = line.split(' ')[4].strip()
			result.loc[index, 'job_start'] = line,
		elif re.compile(r'任务启动时刻').match(line):
			result.loc[index, 'start_time'] = line.split('刻')[1].strip().split(' ')[1].strip() + ' ' + line.split('刻')[1].strip().split(' ')[2].strip()
		elif re.compile(r'任务结束时刻').match(line):
			result.loc[index, 'end_time'] = line.split('刻')[1].strip().split(' ')[1].strip() + ' ' + line.split('刻')[1].strip().split(' ')[2].strip()
		elif re.compile(r'任务总计耗时').match(line):
			result.loc[index, 'seconds'] = line.split(':')[1].strip().replace('s','')
		elif re.compile(r'任务平均流量').match(line):
			result.loc[index, 'traffic'] = line.split(':')[1].strip()
		elif re.compile(r'记录写入速度').match(line):
			result.loc[index, 'write_speed'] = line.split(':')[1].strip()
		elif re.compile(r'读出记录总数').match(line):
			result.loc[index, 'read_record'] = line.split(':')[1].strip()
		elif re.compile(r'读写失败总数').match(line):
			result.loc[index, 'failed_record'] = line.split(':')[1].strip()
		elif re.compile(r'job_finish').match(line):
			result.loc[index, 'job_finish'] = line,
			index = index + 1
		else:
			pass
	save_to_db(result)
 
get_job_result_from_logfile("/opt/datax/job/log")

以上这篇Python 获取 datax 执行结果保存到数据库的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Pandas分组与排序的实现

    Pandas分组与排序的实现

    这篇文章主要介绍了Pandas分组与排序的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-07-07
  • 解决安装python3.7.4报错Can''''t connect to HTTPS URL because the SSL module is not available

    解决安装python3.7.4报错Can''''t connect to HTTPS URL because the S

    这篇文章主要介绍了解决安装python3.7.4报错Can't connect to HTTPS URL because the SSL module is not available,本文给大家简单分析了错误原因,给出了解决方法,需要的朋友可以参考下
    2019-07-07
  • Python Pycharm虚拟下百度飞浆PaddleX安装报错问题及处理方法(亲测100%有效)

    Python Pycharm虚拟下百度飞浆PaddleX安装报错问题及处理方法(亲测100%有效)

    最近很多朋友给小编留言在安装PaddleX的时候总是出现各种奇葩问题,不知道该怎么处理,今天小编通过本文给大家介绍下Python Pycharm虚拟下百度飞浆PaddleX安装报错问题及处理方法,真的有效,遇到同样问题的朋友快来参考下吧
    2021-05-05
  • pytorch tensor合并与分割方式

    pytorch tensor合并与分割方式

    这篇文章主要介绍了pytorch tensor合并与分割方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-02-02
  • Python实现pdf文档转txt的方法示例

    Python实现pdf文档转txt的方法示例

    这篇文章主要介绍了Python实现pdf文档转txt的方法,结合实例形式分析了Python基于第三方库pdfminier实现针对pdf格式文档的读取、转换等相关操作技巧,需要的朋友可以参考下
    2018-01-01
  • Python爬虫基础之爬虫的分类知识总结

    Python爬虫基础之爬虫的分类知识总结

    来给大家讲python爬虫的基础啦,首先我们从爬虫的分类开始讲起,下文有非常详细的知识总结,对正在学习python的小伙伴们很有帮助,需要的朋友可以参考下
    2021-05-05
  • python处理写入数据代码讲解

    python处理写入数据代码讲解

    在本篇文章里小编给大家整理的是一篇关于python处理写入数据代码讲解内容,有兴趣的朋友们可以学习下。
    2020-10-10
  • Python基于jieba, wordcloud库生成中文词云

    Python基于jieba, wordcloud库生成中文词云

    这篇文章主要介绍了Python基于jieba, wordcloud库生成中文词云,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05
  • python开发之str.format()用法实例分析

    python开发之str.format()用法实例分析

    这篇文章主要介绍了python开发之str.format()用法,结合实例形式较为详细的分析了str.format()函数的功能,使用方法与相关注意事项,代码包含详尽的注释说明,需要的朋友可以参考下
    2016-02-02
  • wxPython实现分隔窗口

    wxPython实现分隔窗口

    这篇文章主要为大家详细介绍了wxPython实现分隔窗口,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-11-11

最新评论