通过 drmaa 在 qsub 命令中进行 Shell 变量扩展

通过 drmaa 在 qsub 命令中进行 Shell 变量扩展

我正在使用以下命令向 SGE (Sun Grid Engine) 运行批量作业提交python drmaa 绑定

对于批量作业提交,我正在提交一个 python 脚本,该脚本接受一个参数,并且可以通过 shebang 进行命令行可执行。为了正确参数化作业批量提交,我设置环境变量以通过选项传播到 python 脚本-v。我正在尝试根据SGE 在作业提交期间导出的$TASK_ID/环境变量在我的 zsh 环境中进行间接变量扩展。$SGE_TASK_ID

作为间接变量扩展的最小可重现示例,我正在尝试执行类似的操作,该操作在我的 shell 中有效。

export foo1=2
export num=1

echo $(tmp=foo$num; echo ${(P)tmp})

产生2

示例脚本job_script.py

#! /usr/bin/python
import argparse
import os

parser = argparse.ArgumentParser()
parser.add_argument("input_path", type=os.path.realpath)

def main(input_path):
    # do stuff
    ...

if __name__ == "__main__":
    args = parser.parse_args
    input_path = args.input_path
    main(input_path)

drmaa 提交脚本示例


import os

# add path to libs
os.environ["DMRAA_LIBRARY_PATH"] = "path to DMRAA shared object"
os.environ["SGE_ROOT"] = "path to SGE root directory"
import drmaa

input_dir_suffixes = [1, 2, 5, 7, 10, 11]

INPUT_BASE_DIR = "/home/mel/input_data"

base_qsub_options = {
    "P": "project",
    "q": "queue",
    "b": "y", # means is an executable
    "shell": "y", # start up shell
}
native_specification = " ".join(f"-{k} {v}" for k,v in base_qsub_options.items())
remote_command = "job_script.py"

num_task_ids = len(input_dir_suffixes)
task_start = 1
task_stop = num_task_ids + 1
task_step = 1
task_id_zip = zip(range(1, num_task_ids + 1), input_dir_suffixes) 
task_id_env_vars = {
   f"TASK_ID_{task_id}_SUFFIX": str(suffix) for task_id, suffix in task_id_zip 
}

io_task_id = r"$(tmp=SUFFIX_TASK_ID_$TASK_ID; echo ${(P)tmp)})"
arg_task_id = r"$(tmp=SUFFIX_TASK_ID_$SGE_TASK_ID; echo ${(P)tmp)})"

with drmaa.Session() as session:
    
    template = session.createJobTemplate()
    template.nativeSpecification = native_specification
    template.remoteCommand = remote_command
    template.jobEnvironment = task_id_env_vars
    template.outputPath = f":{INPUT_BASE_DIR}/output/{io_task_id}.o"
    template.outputPath = f":{INPUT_BASE_DIR}/error/{io_task_id}.e"

    args_list = [f"{INPUT_BASE_DIR}/data{arg_task_id}"]
    template.args = args_list
    session.runBulkJobs(template, task_start, task_stop - 1, task_step)
    session.deleteJobTemplate(template)

如果有语法错误,请道歉,我必须手动复制它,因为它在不同的系统上。

提交完成后

如果我在职qstat -j号码

我显示以下设置

sge_o_shell:         /usr/bin/zsh
stderr_path_list:    NONE:<node>:/home/mel/input_data/error_log/$(tmp=SUFFIX_TASK_ID_$TASK_ID; echo ${(P)tmp}).e
stdout_path_list:    NONE:<node>:/home/mel/input_data/output_log/$(tmp=SUFFIX_TASK_ID_$TASK_ID; echo ${(P)tmp}).o
job_args:            /home/mel/input_data/data$(tmp=SUFFIX_TASK_ID$SGE_TASK_ID; echo ${(P)tmp})
script_file:         job_script.py

env_list: 
SUFFIX_TASK_ID_1=1,SUFFIX_TASK_ID_2=2,SUFFIX_TASK_ID_3=5,SUFFIX_TASK_ID_4=7,SUFFIX_TASK_ID_5=10,SUFFIX_TASK_ID_6=11

错误日志和输出日志分别生成,但只有部分扩展。

例子

$(tmp=SUFFIX_TASK_ID1; echo ${(P)tmp}).e
$(tmp=SUFFIX_TASK_ID1; echo ${(P)tmp}).o

如果我们cat看到错误日志Illegal variable name

我想做的事情可能吗?

所以我假设某个地方没有正确激活我的 zsh。

答案1

SGE 使用 Bourne shell ( /bin/sh) 作为作业脚本的默认 shell,这可能会导致依赖于其他 shell 特定功能或语法的脚本出现问题。在您的情况下,您正在尝试使用一项zsh功能(使用 进行参数扩展${(P)tmp})。

问题似乎与您尝试在 DRMAA 作业中执行的 shell 变量扩展有关。当作业提交到 SGE 时,您的间接变量扩展 ( ) 似乎${(P)tmp}无法正确识别。

这里值得注意的是,虽然您的登录 shell 可能是zsh( sge_o_shell: /usr/bin/zsh),但该 shell解释qsub 命令行参数和 DRMAA 作业提交参数可能不是zsh。该${(P)tmp}语法特定于zsh, 并且不适用于其他 shell,例如bashsh,这可能是解释这些参数的 shell。

-v因此,当您在 DRMAA 脚本中使用选项(传递环境变量)和命令替换 ( )提交作业时$(...),SGE 可能不会使用 来解释命令的这些部分zsh,这就是zsh- 特定语法不起作用的原因。

这意味着您可能必须创建一个包装脚本来zsh执行变量扩展,然后使用结果调用 Python 脚本。该包装器脚本将作为作业脚本提交,并且它可以使用zsh的功能,因为您正在通过 运行它zsh

#!/usr/bin/env zsh

# use zsh for variable expansion
suffix=$(tmp=SUFFIX_TASK_ID_$TASK_ID; echo ${(P)tmp})

# call the python script with the result(s)
exec /path/to/job_script.py /home/mel/input_data/data$suffix

将包装器脚本作为 DRMAA 脚本中的作业提交,并指定它应使用以下命令运行zsh

...
# path to the wrapper script
remote_command = "/path/to/wrapper_script.zsh"

...
# specify that the job should be run with zsh
base_qsub_options = {
    ...
    "shell": "/usr/bin/zsh",
    ...
}
...

注意:您可能需要根据您的需要调整这些。

相关内容