根据另一个 CSV 中的值过滤一个非常大的 CSV

根据另一个 CSV 中的值过滤一个非常大的 CSV

我正在处理一些不适合 RAM 的 CSV 文件。

这 2 个 CSV 文件具有以下结构:

第一个.csv

ID 姓名 时间戳
连续剧 斯特 年-月-日 时:分:秒

第二个.csv

ID 姓名 日期
连续剧 斯特 年-月-日

目标是从中选择first.csv与以下条件匹配的行second.csv

  • name是平等的
  • timestamp范围为[ date-1, date+1]。

迭代所有这些行后,可以将输出合并到一个输出文件中。

答案1

我不知道在 shell 中这有什么可能,但我认为它很难编写,而且以后很难阅读(也许修改)。

我已经针对基本 CSV 任务(选择/删除列、过滤行)对 Go 与 awk 进行了测试,Go 更快(有时“快得多”)。

对于您的帖子,我制作了一个包含 8,640,001 行和约 271 MB 的测试文件,然后制作了 2 个示例处理器,一个使用 Python,另一个使用 Go,它们利用即读即写模式,因此没有中间存储(并且两者都使用缓冲 IO,这可以提高大文件的效率)。

  • Python脚本:运行约 70 秒,使用约 6.5 MB 内存
  • 转为二进制:运行时间约 3.5 秒,使用约 10 MB 内存

但首先,它能完成这项工作吗?

基本设置

我制作了这两个小样本来开发:

第一个.csv

id,name,timestamp
1,foo,2000-01-01 00:00:00
2,foo,2000-01-02 00:00:00
3,foo,2000-01-03 00:00:00
4,foo,2000-01-04 00:00:00
5,foo,2000-01-05 00:00:00
6,bar,2000-02-01 00:00:00
7,bar,2000-02-02 00:00:00
8,bar,2000-02-03 00:00:00
9,bar,2000-02-04 00:00:00

第二个.csv

id,name,date
10,foo,2000-01-03
11,bar,2000-02-02

目前尚不清楚“date-1”和“date+1”的含义,因此我假设您想要“加或减一天”。

当我针对这些文件运行 Go 或 Python 代码时,我得到:

2,foo,2000-01-02 00:00:00
3,foo,2000-01-03 00:00:00
4,foo,2000-01-04 00:00:00
6,bar,2000-02-01 00:00:00
7,bar,2000-02-02 00:00:00
8,bar,2000-02-03 00:00:00

鉴于我对您的要求和输入的解释,这是我所期望的:

foo 2000-01-03bar 2000-02-02

测试文件

我制作了这个测试生成器,它仅为 foo 创建记录,间隔 1 秒,持续 100 天:

import csv
from datetime import datetime, timedelta

dt_start = datetime(2000, 1, 1)

with open('test.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['id', 'name', 'timestamp'])

    # 1 line per second for 100 days
    for i in range(86400 * 100):
        plus_secs = timedelta(seconds=i + 1)
        writer.writerow([i + 1, 'foo', dt_start + plus_secs])

这是什么测试.csv好像:

% ll test.csv 
-rw-r--r--  1 alice  bob   271M Nov 19 22:19 test.csv

% wc -l test.csv 
 8640001 test.csv

将测试文件链接到first.csv,ln -fs test.csv first.csv我准备运行以下命令...

Python

import csv
import sys
from datetime import datetime, timedelta

DATE_FMT = f'%Y-%m-%d'
DATETIME_FMT = f'%Y-%m-%d %H:%M:%S'

# Create lookup from second

# {name: [date-1day, date+1day]}
lookup = {}

with open('second.csv', newline='') as f:
    reader = csv.reader(f)
    header = next(reader)
    nm_col = header.index('name')
    dt_col = header.index('date')

    for row in reader:
        name = row[nm_col]
        dt_str = row[dt_col]

        dt = datetime.strptime(dt_str, DATE_FMT)
        min_dt = dt - timedelta(days=1)
        max_dt = dt + timedelta(days=1) # - timedelta(seconds=1)

        lookup[name] = [min_dt, max_dt]


# Create on-demand writer, and iterate over first, writing when we need to

writer = csv.writer(sys.stdout)

with open('first.csv', newline='') as f:
    reader = csv.reader(f)
    header = next(reader)
    nm_col = header.index('name')
    dt_col = header.index('timestamp')

    writer.writerow(header)

    for row in reader:
        name = row[nm_col]
        if name not in lookup:
            continue

        dt_str = row[dt_col]
        dt = datetime.strptime(dt_str, DATETIME_FMT)
        min_dt = lookup[name][0]
        max_dt = lookup[name][1]
        
        if dt < min_dt or dt > max_dt:
            continue

        writer.writerow(row)

并运行脚本:

% time python3 main.py > result.csv
python3 main.py > result.csv  69.93s user 0.40s system 98% cpu 1:11.07 total

% head -n5 result.csv 
id,name,timestamp
86400,foo,2000-01-02 00:00:00
86401,foo,2000-01-02 00:00:01
86402,foo,2000-01-02 00:00:02
86403,foo,2000-01-02 00:00:03

% tail -n5 result.csv 
259196,foo,2000-01-03 23:59:56
259197,foo,2000-01-03 23:59:57
259198,foo,2000-01-03 23:59:58
259199,foo,2000-01-03 23:59:59
259200,foo,2000-01-04 00:00:00  # is this right?

对我来说这看起来是正确的:仅记录 48 小时范围内的内容,以查找日期为中心。我不确定最后找到的条目,它是从第四个瞬间开始的——这就是注释掉的内容- timedelta(seconds=1)

package main

import (
    "encoding/csv"
    "io"
    "os"
    "time"
)

type LookupEntry struct {
    oneDayBefore time.Time
    oneDayAfter  time.Time
}

const DATE_FMT = "2006-01-02"
const DATETIME_FMT = "2006-01-02 15:04:05"

var lookup = make(map[string]LookupEntry)

func main() {
    makeLookupTable()
    findMatchingEntries()
}

func makeLookupTable() {
    f, _ := os.Open("second.csv")
    defer f.Close()

    r := csv.NewReader(f)
    r.Read() // Discard header
    for {
        record, err := r.Read()
        if err == io.EOF {
            break
        }
        dt, _ := time.Parse(DATE_FMT, record[2])
        oneDayBefore := dt.AddDate(0, 0, -1)
        oneDayAfter := dt.AddDate(0, 0, 1)  // .Add(-time.Millisecond * 1000)
        lookup[record[1]] = LookupEntry{oneDayBefore, oneDayAfter}
    }
}

func findMatchingEntries() {
    f1, _ := os.Open("first.csv")
    defer f1.Close()

    w := csv.NewWriter(os.Stdout)

    r := csv.NewReader(f1)
    header, _ := r.Read()
    w.Write(header)

    for {
        record, err := r.Read()
        if err == io.EOF {
            break
        }

        lookupEntry, ok := lookup[record[1]]
        if !ok {
            continue
        }

        dt, _ := time.Parse(DATETIME_FMT, record[2])
        if dt.Before(lookupEntry.oneDayBefore) || dt.After(lookupEntry.oneDayAfter) {
            continue
        }

        w.Write(record)
    }
    w.Flush()
}

构建并运行测试:

% go build main.go

% time ./main > result.csv   
./main > result.csv  3.53s user 0.14s system 104% cpu 3.504 total

% head -n5 result.csv 
86400,foo,2000-01-02 00:00:00
86401,foo,2000-01-02 00:00:01
86402,foo,2000-01-02 00:00:02
86403,foo,2000-01-02 00:00:03
86404,foo,2000-01-02 00:00:04

% tail -n5 result.csv 
259196,foo,2000-01-03 23:59:56
259197,foo,2000-01-03 23:59:57
259198,foo,2000-01-03 23:59:58
259199,foo,2000-01-03 23:59:59
259200,foo,2000-01-04 00:00:00

相关内容