我有一个文件,其中的一部分如下例所示,其中包含一个时间戳字段:
20161203001211,00
20161203001200,00
20161203001500,102
20161203003224,00
20161203001500,00
20161203004211,00
20161203005659,102
20161203000143,103
20161202001643,100
....
我想根据时间戳处理此文件,以 15 分钟为间隔计算发生次数。我知道如何每分钟执行一次,我也使用awk
脚本在 10 分钟内执行过,但不知道如何才能在 15 分钟内获得以下输出:
startTime-endTime total SUCCESS FAILED
20161203000000-20161203001500 5 3 2
20161203001500-20161203003000 2 1 1
20161203003000-20161203004500 2 2 0
20161203004500-20161203010000 1 0 1
20161202000000-20161202001500 0 0 0
20161202001500-20161202003000 1 0 1
....
00表示成功,其它情况均表示失败记录。
是的,是 24 小时,所以一天中的每个小时都应该有 4 条间隔打印记录。
答案1
在带时间戳的数据文件上撰写报告;复杂的要求
虽然最初的问题是一点点复杂,语境这个问题使得这个问题变得相当困难。其他情况如下(如聊天中讨论的那样):
- 该脚本需要将多个带有时间戳的文件合并为一份报告,可能分布在多个带有日期戳的文件夹中(取决于设置的时间范围)。
- 该脚本需要能够选择一个时间范围,不仅可以从文件名上的时间戳中选择,还可以从子范围中选择读自文件的行。
- 没有数据的时间段(季度)应该报告“零”输出
- 时间格式在输出中,报告名称和报告行数(每 15 分钟)都需要与输入格式不同。
- 要处理的行需要满足条件,脚本必须检查该条件
- 相关数据可能位于生产线的不同位置
- 剧本需要
python2
- 该脚本必须考虑当地时间和 UTC 之间的(可变)差异。
- 添加了额外选项:导出到基本 csv 的选项、可选列开/关
- 最后但同样重要的一点是:需要处理的数据量超过了巨大的;数千个文件,每个文件数十万行,数 GB,数百万行。换句话说:程序必须智能且高效,才能在合理的时间内处理数据。
解释
最终结果过于全面,无法详细解释,但对于那些感兴趣的人来说,标题如下:
- 所有时间计算都是以纪元时间进行的(这并不奇怪)
- 阅读文件的行,首先要检查条件每行,减少需要立即处理的行数
- 从这些行开始,时间戳转换为纪元后,除以 900(秒,15 分钟),四舍五入(取
int(n)
),然后倍增再乘以 900 来计算它们所属的 15 分钟时段 itertools
随后按'对行进行排序和分组,groupby
并借助ifilter
(python2
)生成每组的结果- 随后首先创建了报告每个文件,因为报告每 15 分钟。每个文件报告的输出不能超过几十行。暂时存储到内存中不可能出现问题。
- 一旦所有相关文件和行都以这种方式处理,所有报告最终都会合并为一份最终报告
尽管数据量很大,但脚本还是能很好地完成任务。在处理过程中,处理器在我使用 10 多年的系统上显示占用率约为 70%,运行稳定。计算机仍可很好地用于其他任务。
剧本
#!/usr/bin/env python2
import time
import datetime
from itertools import groupby, ifilter
from operator import itemgetter
import sys
import os
import math
"""
folders by day stamp: 20161211 (yyymmdd)
files by full readable (start) time 20161211093512 (yyyymmddhhmmss) + header / tail
records inside files by full start time 20161211093512 (yyyymmddhhmmss)
commands are in UTC, report name and time section inside files: + timeshift
"""
################## settings ##################
# --- format settings (don't change) ---
readable = "%Y%m%d%H%M%S"
outputformat = "%d-%m-%Y %H:%M"
dateformat = "%Y%m%d"
#---------- time settings ----------
# interval (seconds)
interval = 900
# time shift UTC <> local (hrs)
timeshift = 3.5
# start from (minutes from now in the past)
backintime = 700
# ---- dynamically set values -------
# condition (string/position)
iftrue = ["mies", 2]
# relevant data (timestamp, result)
data = [0, 1]
# datafolder
datafolder = "/home/jacob/Bureaublad/KasIII"
# ----- output columns------
# 0 = timestamp, 1 = total, 2 = SUCCESS, 3 = FAILS
# don't change the order though, distances will mess up
items = [0, 1, 2, 3]
# include simple csv file
csv = True
###############################################
start = sys.argv[1]
end = sys.argv[2]
output_path = sys.argv[3]
timeshift = timeshift*3600
def extraday():
"""
function to determine what folders possibly contain relevant files
options: today or *also* yesterday
"""
current_time = [
getattr(datetime.datetime.now(), attr) \
for attr in ['hour', 'minute']]
minutes = (current_time[0]*60)+current_time[1]
return backintime >= minutes
extraday()
def set_layout(line):
# take care of a nice output format
line = [str(s) for s in line]
dist1 = (24-len(line[0]))*" "
dist2 = (15-len(line[1]))*" "
dist3 = (15-len(line[2]))*" "
distances = [dist1, dist2, dist3, ""]
displayed = "".join([line[i]+distances[i] for i in items])
return displayed
# return line[0]+dist1+line[1]+dist2+line[2]+dist3+line[3]
def convert_toepoch(pattern, stamp):
"""
function to convert readable format (any) into epoch
"""
return int(time.mktime(time.strptime(stamp, pattern)))
def convert_toreadable(pattern, stamp, shift=0):
"""
function to convert epoch into readable (any)
possibly with a time shift
"""
return time.strftime(pattern, time.gmtime(stamp+shift))
def getrelevantfiles(backtime):
"""
get relevant files from todays subfolder, from starttime in the past
input format of backtime is minutes
"""
allrelevant = []
# current time, in epoch, to select files
currt = int(time.time())
dirs = [convert_toreadable(dateformat, currt)]
# if backintime > today's "age", add yesterday
if extraday():
dirs.append(convert_toreadable(dateformat, currt-86400))
print("Reading from: "+str(dirs))
# get relevant files from folders
for dr in dirs:
try:
relevant = [
[f, convert_toepoch(readable, f[7:21])]
for f in os.listdir(os.path.join(datafolder, dr))
]
allrelevant = allrelevant + [
os.path.join(datafolder, dr, f[0])\
for f in relevant if f[1] >= currt-(backtime*60)
]
except (IOError, OSError):
print "Folder not found:", dr
return allrelevant
def readfile(file):
"""
create the line list to work with, meeting the iftrue conditions
select the relevant lines from the file, meeting the iftrue condition
"""
lines = []
with open(file) as read:
for l in read:
l = l.split(",")
if l[iftrue[1]].strip() == iftrue[0]:
lines.append([l[data[0]], l[data[1]]])
return lines
def timeselect(lines):
"""
select lines from a list that meet the start/end time
input is the filtered list of lines, by readfile()
"""
return [l for l in lines if int(start) <= int(l[0]) < int(end)]
def convert_tosection(stamp):
"""
convert the timestamp in a line to the section (start) it belongs to
input = timestamp, output = epoch
"""
rsection = int(convert_toepoch(readable, stamp)/interval)*interval
return rsection
reportlist = []
foundfiles = getrelevantfiles(backintime)
if foundfiles:
# the actual work, first reports per file, add them to reportlist
for f in foundfiles:
# create report per file
# get lines that match condition, match the end/start
lines = timeselect(readfile(f))
# get the (time) relevant lines inside the file
for item in lines:
# convert stamp to section
item[0] = convert_tosection(item[0])
lines.sort(key=lambda x: x[0])
for item, occurrence in groupby(lines, itemgetter(0)):
occ = list(occurrence)
total = len(occ)
# ifilter is python2 specific (<> filterfalse in 3)
success = len(list(ifilter(lambda x: x[1].strip() == "00", occ)))
fails = total-success
reportlist.append([item, total, success, fails])
finalreport = []
# then group the reports per file into one
reportlist.sort(key=lambda x: x[0])
for item, occurrence in groupby(reportlist, itemgetter(0)):
occ = [it[1:] for it in list(occurrence)]
output = [str(sum(i)) for i in zip(*occ)]
output.insert(0, item)
finalreport.append(output)
# create timeframe to fill up emty sections
framestart = int(convert_toepoch(readable, start)/interval)*interval
frameend = int(math.ceil(convert_toepoch(readable, end)/interval))*interval
timerange = list(range(framestart, frameend, interval))
currlisted = [r[0] for r in finalreport]
extra = [item for item in timerange if not item in currlisted]
# add missing time sections
for item in extra:
finalreport.append([item, 0, 0, 0])
finalreport.sort(key=lambda x: x[0])
print(str(len(finalreport))+" timesections reported")
# define output file
fname1 = convert_toreadable(
readable,
convert_toepoch(readable, start),
timeshift)
fname2 = convert_toreadable(
readable,
convert_toepoch(readable, end),
timeshift)
filename = "report_"+fname1+"_"+fname2
outputfile = os.path.join(output_path, filename)
# edit the time stamp into the desired output format, add time shift
with open(outputfile, "wt") as report:
report.write(set_layout(["starttime", "total", "SUCCESS", "FAILED"])+"\n")
for item in finalreport:
item[0] = convert_toreadable(outputformat, item[0], timeshift)
report.write(set_layout(item)+"\n")
if csv:
with open(outputfile+".csv", "wt") as csv_file:
csv_file.write(",".join(["starttime", "total", "SUCCESS", "FAILED"])+"\n")
for item in finalreport:
csv_file.write(",".join(item)+"\n")
else:
print("no files to read")
输出的一小部分样本
starttime total SUCCESS FAILED
12-12-2016 03:30 2029 682 1347
12-12-2016 03:45 2120 732 1388
12-12-2016 04:00 2082 745 1337
12-12-2016 04:15 2072 710 1362
12-12-2016 04:30 2004 700 1304
12-12-2016 04:45 2110 696 1414
12-12-2016 05:00 2148 706 1442
12-12-2016 05:15 2105 704 1401
12-12-2016 05:30 2040 620 1420
12-12-2016 05:45 2030 654 1376
12-12-2016 06:00 2067 692 1375
12-12-2016 06:15 2079 648 1431
12-12-2016 06:30 2030 706 1324
12-12-2016 06:45 2085 713 1372
12-12-2016 07:00 2064 726 1338
12-12-2016 07:15 2113 728 1385