对于我的工作流程,我需要使用 spark 运行作业。我尝试使用 airflow 运行我的 spark 作业。但是,存在网络超时问题。因此,我向 sparkSubmitOperator conf 添加了“spark.network.timeout”选项,如下所示。
from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
SLACK_CONN_ID = 'slack-search'
#---- Spark resource size
NUM_EXECUTOR = 56
EXECUTOR_CORES = 5
EXECUTOR_MEMORY = '20g'
JAR_FILE = 'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar'
EXECUTE_CLASS = 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay'
AGG_PERIOD = 1
def task_fail_slack_alert(context):
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = "iac_keyword_stat_day failed"
failed_alert = SlackWebhookOperator(
task_id='slack_notify',
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
channel='#search-alert',
username='airflow',
dag=dag)
return failed_alert.execute(context=context)
def get_from_date():
return (datetime.now() - timedelta(days=AGG_PERIOD)).strftime('%Y%m%d')
def get_to_date():
return datetime.now().strftime('%Y%m%d')
default_args = {
'owner': 'search',
'depends_on_past': False,
'start_date': datetime(2019, 6, 21),
'retries': 1,
'retry_delay': timedelta(hours=1),
'on_fail_callback': task_fail_slack_alert,
}
default_conf = {
'spark.network.timeout': '800s',
'spark.executor.heartbeatInterval': '60s'
}
dag = DAG('iac_keyword_stat_day', catchup=False, default_args=default_args, schedule_interval="0 6 * * *")
t1 = SparkSubmitOperator(
task_id='spark_submit',
application=JAR_FILE,
conf=default_conf,
java_class=EXECUTE_CLASS,
executor_cores=EXECUTOR_CORES,
executor_memory=EXECUTOR_MEMORY,
num_executors=NUM_EXECUTOR,
application_args=["--from", get_from_date(), "--to", get_to_date()],
dag=dag)
但它却造成了以下气流错误。
{__init__.py:1580} ERROR - Cannot execute: ['spark-submit', '--master', 'yarn', '--conf', 'spark.executor.heartbeatInterval=60s', '--conf', 'spark.network.timeout=800s', '--num-executors', '56', '--executor-cores', '5', '--executor-memory', '20g', '--name', u'airflow-spark', '--class', 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay', '--queue', u'default', u'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar', u'--from', u'20190624', u'--to', u'20190625']. Error code is: 1.
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/spark_submit_operator.py", line 176, in execute
self._hook.submit(self._application)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", line 352, in submit
spark_submit_cmd, returncode
AirflowException: Cannot execute: ['spark-submit', '--master', 'yarn', '--conf', 'spark.executor.heartbeatInterval=60s', '--conf', 'spark.network.timeout=800s', '--num-executors', '56', '--executor-cores', '5', '--executor-memory', '20g', '--name', u'airflow-spark', '--class', 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay', '--queue', u'default', u'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar', u'--from', u'20190624', u'--to', u'20190625']. Error code is: 1.
我如何解决它????