将 csv 从 CF 写入存储桶时:'with open(filepath, "w") as MY_CSV:' 导致“FileNotFoundError: [Errno 2] 没有这样的文件或目录:”

将 csv 从 CF 写入存储桶时:'with open(filepath, "w") as MY_CSV:' 导致“FileNotFoundError: [Errno 2] 没有这样的文件或目录:”

FileNotFoundError: [Errno 2] No such file or directory当我尝试使用循环处理批量数据的 csv 写入器将 csv 文件写入存储桶时,出现此错误。有关该错误的完整 Cloud Function 日志信息:


File "/workspace/main.py", line 299, in write_to_csv_file with
open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such
file or directory: 'gs://MY_BUCKET/MY_CSV.csv'

Function execution took 52655 ms, finished with status: 'crash' 

OpenBLAS WARNING - could not determine the L2 cache size on this
system, assuming 256k  ```

尽管这个 bucket_filepath 肯定存在:我可以上传一个空的虚拟文件并获取它的“gsutils URI”(右键单击文件右侧的三个点),bucket_filepath 看起来会相同:'gs://MY_BUCKET/MY_CSV.csv'

我检查了是否保存了一个虚拟的 pandas 数据框,pd.to_csv并且它使用相同的 bucket_filepath (!) 工作。

因此,一定有其他原因,可能是编写器不被接受,或者是with statement打开了该文件。

引发错误的代码如下。这是在本地服务器上的正常 cron 作业中,在 Google Cloud Function 之外运行的相同代码。我在引发错误的行周围添加了两个调试打印,不再print("Right after opening the file ...")显示。还显示了调用每个批次的子函数query_execute_batch()write_to_csv_file()但可能不是这里的问题,因为错误在写入打开 csv 文件时就已经发生了。

requirements.txt(然后作为模块导入):

SQLAlchemy>=1.4.2
google-cloud-storage>=1.16.1
mysqlclient==2.1.0
pandas==1.2.3
fsspec==2021.11.1
gcsfs==2021.11.1
unicodecsv==0.14.1

来自main.py

def query_execute_batch(connection):
    """Function for reading data from the query result into batches
    :yield: each result in a loop is a batch of the query result
    """
    results = execute_select_batch(connection, SQL_QUERY)
    print(f"len(results): {len(results)}")
    for result in results:
        yield result

def write_to_csv_file(connection, filepath):
    """Write the data in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    returns: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")    
    with open(filepath, "w") as outcsv:
        print("Right after opening the file ...")        
        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        writer.writeheader()

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

请注意,要使上述脚本正常运行,我需要导入gcsfs,以使存储桶可读写。否则,我可能需要一个 Google 云存储对象,例如:

storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)

然后使该存储桶中的文件具有进一步的功能,但这不是这里的目的。

下面是pd.to_csv可以运行的代码,它使用虚拟 SQL 查询的输出SELECT 1作为数据框的输入。这保存到同一个 bucket_filepath,当然原因可能不只是pd.to_csv()这样,也可能是因为数据集是虚拟的,而不是来自巨大的复杂 unicode 字符串SELECT query。或者有其他原因,我只是猜测。

if records is not None:
    df = pd.DataFrame(records.fetchall())
    df.columns = records.keys()
    df.to_csv(filepath,
        index=False,
    )
    datetime_now_save = datetime.now()
    countrows = df.shape[0]

我希望使用 csv 编写器,有机会使用 unicodecsv 模块以 unicode 格式写入,并有机会使用批次。

我可能愿意在 pandas 中更改为批次(loop + append模式或chunksize),就像将大型 Pandas Dataframes 分块写入 CSV 文件摆脱这个存储桶文件路径问题,但我宁愿使用现成的代码(永远不要触碰正在运行的系统)。

如何使用 csv 编写器保存该 csv,以便它可以在write模式 =中在存储桶中打开一个新文件with open(filepath, "w") as outcsv:

给出的函数write_to_csv_file()只是 Cloud Function 的一小部分,它使用了大量函数和级联函数。我无法在这里展示整个可重现的案例,希望能够通过经验或更简单的示例来回答。

答案1

答案是令人惊讶的。你必须gcsfs如果您想使用 写入文件,请导入并使用该模块open()

如果使用pd.to_csv()import gcsfs则不需要,但是gcsfs仍然requirements.txt需要pd.to_csv(),因此,pandasto_csv()似乎会自动使用它。

抛开惊讶pd.to_csv(),下面是回答问题的代码(已测试):

def write_to_csv_file(connection, filepath):
    """Write the QUERY result in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    return: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")
   

    # A gcsfs object is needed to open a file.
    # https://stackoverflow.com/questions/52805016/how-to-open-a-file-from-google-cloud-storage-into-a-cloud-function
    # https://gcsfs.readthedocs.io/en/latest/index.html#examples
    # Side-note (Exception):
    # pd.to_csv() needs neither the gcsfs object, nor its import.
    # It is not used here, but it has been tested with examples.
    fs = gcsfs.GCSFileSystem(project=MY_PROJECT)
    fs.ls(BUCKET_NAME)


    # wb needed, else "builtins.TypeError: must be str, not bytes"
    # https://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes
    with fs.open(filepath, 'wb') as outcsv:
        print("Right after opening the file ...")

        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        print("before writer.writeheader()")
        writer.writeheader()
        print("after writer.writeheader()")

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

边注

不要像这样使用 csv 编写器。

它耗时太长了,参数为 5000 的 CFpd.to_csv()chunksize需要 62 秒就可以加载 700k 行并将其作为 csv 存储在 bucket 中,而具有批处理写入器的 CF 需要超过 9 分钟,这超过了超时限制。因此,我不得不改用pd.to_csv()并将我的数据转换为数据框。

相关内容