根据 x 个时间间隔内的时间戳处理文件记录

根据 x 个时间间隔内的时间戳处理文件记录

我有一个文件,其中的一部分如下例所示,其中包含一个时间戳字段:

20161203001211,00
20161203001200,00
20161203001500,102
20161203003224,00
20161203001500,00
20161203004211,00
20161203005659,102
20161203000143,103
20161202001643,100
....

我想根据时间戳处理此文件,以 15 分钟为间隔计算发生次数。我知道如何每分钟执行一次,我也使用awk脚本在 10 分钟内执行过,但不知道如何才能在 15 分钟内获得以下输出:

startTime-endTime             total SUCCESS FAILED    
20161203000000-20161203001500 5     3       2
20161203001500-20161203003000 2     1       1
20161203003000-20161203004500 2     2       0
20161203004500-20161203010000 1     0       1
20161202000000-20161202001500 0     0       0
20161202001500-20161202003000 1     0       1
....

00表示成功,其它情况均表示失败记录。

是的,是 24 小时,所以一天中的每个小时都应该有 4 条间隔打印记录。

答案1

在带时间戳的数据文件上撰写报告;复杂的要求


虽然最初的问题是一点点复杂,语境这个问题使得这个问题变得相当困难。其他情况如下(如聊天中讨论的那样):

  • 该脚本需要将多个带有时间戳的文件合并为一份报告,可能分布在多个带有日期戳的文件夹中(取决于设置的时间范围)。
  • 该脚本需要能够选择一个时间范围,不仅可以从文件名上的时间戳中选择,还可以从子范围中选择读自文件的行。
  • 没有数据的时间段(季度)应该报告“零”输出
  • 时间格式在输出中,报告名称和报告行数(每 15 分钟)都需要与输入格式不同。
  • 要处理的行需要满足条件,脚本必须检查该条件
  • 相关数据可能位于生产线的不同位置
  • 剧本需要python2
  • 该脚本必须考虑当地时间和 UTC 之间的(可变)差异。
  • 添加了额外选项:导出到基本 csv 的选项、可选列开/关
  • 最后但同样重要的一点是:需要处理的数据量超过了巨大的;数千个文件,每个文件数十万行,数 GB,数百万行。换句话说:程序必须智能且高效,才能在合理的时间内处理数据。

解释

最终结果过于全面,无法详细解释,但对于那些感兴趣的人来说,标题如下:

  • 所有时间计算都是以纪元时间进行的(这并不奇怪)
  • 阅读文件的行,首先要检查条件每行,减少需要立即处理的行数
  • 从这些行开始,时间戳转换为纪元后,除以 900(秒,15 分钟),四舍五入(取int(n)),然后倍增再乘以 900 来计算它们所属的 15 分钟时段
  • itertools随后按'对行进行排序和分组,groupby并借助ifilter( python2)生成每组的结果
  • 随后首先创建了报告每个文件,因为报告每 15 分钟。每个文件报告的输出不能超过几十行。暂时存储到内存中不可能出现问题。
  • 一旦所有相关文件和行都以这种方式处理,所有报告最终都会合并为一份最终报告

尽管数据量很大,但脚本还是能很好地完成任务。在处理过程中,处理器在我使用 10 多年的系统上显示占用率约为 70%,运行稳定。计算机仍可很好地用于其他任务。

剧本

#!/usr/bin/env python2
import time
import datetime
from itertools import groupby, ifilter
from operator import itemgetter
import sys
import os
import math

"""
folders by day stamp: 20161211 (yyymmdd)
files by full readable (start) time 20161211093512 (yyyymmddhhmmss) + header / tail
records inside files by full start time 20161211093512 (yyyymmddhhmmss)
commands are in UTC, report name and time section inside files: + timeshift
"""

################## settings  ##################

# --- format settings (don't change) ---
readable = "%Y%m%d%H%M%S"
outputformat = "%d-%m-%Y %H:%M"
dateformat = "%Y%m%d"

#---------- time settings ----------
# interval (seconds)
interval = 900
# time shift UTC <> local (hrs)
timeshift = 3.5
# start from (minutes from now in the past)
backintime = 700

# ---- dynamically set values -------
# condition (string/position)
iftrue = ["mies", 2]
# relevant data (timestamp, result)
data = [0, 1]
# datafolder
datafolder = "/home/jacob/Bureaublad/KasIII"

# ----- output columns------
# 0 = timestamp, 1 = total, 2 = SUCCESS, 3 = FAILS
# don't change the order though, distances will mess up
items = [0, 1, 2, 3]
# include simple csv file
csv = True

###############################################

start = sys.argv[1]
end = sys.argv[2]
output_path = sys.argv[3]

timeshift = timeshift*3600

def extraday():
    """
    function to determine what folders possibly contain relevant files
    options: today or *also* yesterday
    """
    current_time = [
        getattr(datetime.datetime.now(), attr) \
        for attr in ['hour', 'minute']]
    minutes = (current_time[0]*60)+current_time[1]                   
    return backintime >= minutes

extraday()

def set_layout(line):
    # take care of a nice output format
    line = [str(s) for s in line]
    dist1 = (24-len(line[0]))*" "
    dist2 = (15-len(line[1]))*" "
    dist3 = (15-len(line[2]))*" "
    distances = [dist1, dist2, dist3, ""]
    displayed = "".join([line[i]+distances[i] for i in items])
    return displayed


    # return line[0]+dist1+line[1]+dist2+line[2]+dist3+line[3]

def convert_toepoch(pattern, stamp):
    """
    function to convert readable format (any) into epoch
    """
    return int(time.mktime(time.strptime(stamp, pattern)))

def convert_toreadable(pattern, stamp, shift=0):
    """
    function to convert epoch into readable (any)
    possibly with a time shift
    """
    return time.strftime(pattern, time.gmtime(stamp+shift))

def getrelevantfiles(backtime):
    """
    get relevant files from todays subfolder, from starttime in the past
    input format of backtime is minutes
    """
    allrelevant = []
    # current time, in epoch, to select files
    currt = int(time.time())
    dirs = [convert_toreadable(dateformat, currt)]
    # if backintime > today's "age", add yesterday
    if extraday():
        dirs.append(convert_toreadable(dateformat, currt-86400))
    print("Reading from: "+str(dirs))
    # get relevant files from folders
    for dr in dirs:
        try:
            relevant = [
                [f, convert_toepoch(readable, f[7:21])]
                for f in os.listdir(os.path.join(datafolder, dr))
                ]
            allrelevant = allrelevant + [
                os.path.join(datafolder, dr, f[0])\
                for f in relevant if f[1] >= currt-(backtime*60)
                ]
        except (IOError, OSError):
            print "Folder not found:", dr
    return allrelevant

def readfile(file):
    """
    create the line list to work with, meeting the iftrue conditions
    select the relevant lines from the file, meeting the iftrue condition
    """
    lines = []
    with open(file) as read:
        for l in read:
            l = l.split(",")
            if l[iftrue[1]].strip() == iftrue[0]:
                lines.append([l[data[0]], l[data[1]]])
    return lines

def timeselect(lines):
    """
    select lines from a list that meet the start/end time
    input is the filtered list of lines, by readfile()
    """
    return [l for l in lines if int(start) <= int(l[0]) < int(end)]

def convert_tosection(stamp):
    """
    convert the timestamp in a line to the section (start) it belongs to
    input = timestamp, output = epoch
    """
    rsection = int(convert_toepoch(readable, stamp)/interval)*interval
    return rsection

reportlist = []

foundfiles = getrelevantfiles(backintime)

if foundfiles:
    # the actual work, first reports per file, add them to reportlist
    for f in foundfiles:
        # create report per file
        # get lines that match condition, match the end/start
        lines = timeselect(readfile(f))
        # get the (time) relevant lines inside the file
        for item in lines:
            # convert stamp to section
            item[0] = convert_tosection(item[0])
        lines.sort(key=lambda x: x[0])
        for item, occurrence in groupby(lines, itemgetter(0)):
            occ = list(occurrence)
            total = len(occ)
            # ifilter is python2 specific (<> filterfalse in 3)
            success = len(list(ifilter(lambda x: x[1].strip() == "00", occ)))
            fails = total-success
            reportlist.append([item, total, success, fails])

    finalreport = []

    # then group the reports per file into one
    reportlist.sort(key=lambda x: x[0])
    for item, occurrence in groupby(reportlist, itemgetter(0)):
        occ = [it[1:] for it in list(occurrence)]
        output = [str(sum(i)) for i in zip(*occ)]
        output.insert(0, item)
        finalreport.append(output)

    # create timeframe to fill up emty sections
    framestart = int(convert_toepoch(readable, start)/interval)*interval
    frameend = int(math.ceil(convert_toepoch(readable, end)/interval))*interval
    timerange = list(range(framestart, frameend, interval))
    currlisted = [r[0] for r in finalreport]
    extra = [item for item in timerange if not item in currlisted]

    # add missing time sections
    for item in extra:
        finalreport.append([item, 0, 0, 0])
    finalreport.sort(key=lambda x: x[0])
    print(str(len(finalreport))+" timesections reported")

    # define output file
    fname1 = convert_toreadable(
        readable,
        convert_toepoch(readable, start),
        timeshift) 
    fname2 = convert_toreadable(
        readable,
        convert_toepoch(readable, end),
        timeshift)
    filename = "report_"+fname1+"_"+fname2
    outputfile = os.path.join(output_path, filename)
    # edit the time stamp into the desired output format, add time shift
    with open(outputfile, "wt") as report:
        report.write(set_layout(["starttime", "total", "SUCCESS", "FAILED"])+"\n")
        for item in finalreport:
            item[0] = convert_toreadable(outputformat, item[0], timeshift)
            report.write(set_layout(item)+"\n")
    if csv:
        with open(outputfile+".csv", "wt") as csv_file:
            csv_file.write(",".join(["starttime", "total", "SUCCESS", "FAILED"])+"\n")
            for item in finalreport:
                csv_file.write(",".join(item)+"\n")

else:
    print("no files to read")

输出的一小部分样本

starttime               total          SUCCESS        FAILED
12-12-2016 03:30        2029           682            1347
12-12-2016 03:45        2120           732            1388
12-12-2016 04:00        2082           745            1337
12-12-2016 04:15        2072           710            1362
12-12-2016 04:30        2004           700            1304
12-12-2016 04:45        2110           696            1414
12-12-2016 05:00        2148           706            1442
12-12-2016 05:15        2105           704            1401
12-12-2016 05:30        2040           620            1420
12-12-2016 05:45        2030           654            1376
12-12-2016 06:00        2067           692            1375
12-12-2016 06:15        2079           648            1431
12-12-2016 06:30        2030           706            1324
12-12-2016 06:45        2085           713            1372
12-12-2016 07:00        2064           726            1338
12-12-2016 07:15        2113           728            1385

相关内容