使用 apache airflow 进行 spark 提交

使用 apache airflow 进行 spark 提交

对于我的工作流程,我需要使用 spark 运行作业。我尝试使用 airflow 运行我的 spark 作业。但是,存在网络超时问题。因此,我向 sparkSubmitOperator conf 添加了“spark.network.timeout”选项,如下所示。

from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

SLACK_CONN_ID = 'slack-search'

#---- Spark resource size
NUM_EXECUTOR = 56
EXECUTOR_CORES = 5
EXECUTOR_MEMORY = '20g'

JAR_FILE = 'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar'
EXECUTE_CLASS = 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay'
AGG_PERIOD = 1

def task_fail_slack_alert(context):
    slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
    slack_msg = "iac_keyword_stat_day failed"
    failed_alert = SlackWebhookOperator(
        task_id='slack_notify',
        http_conn_id=SLACK_CONN_ID,
        webhook_token=slack_webhook_token,
        message=slack_msg,
        channel='#search-alert',
        username='airflow',
        dag=dag)
    return failed_alert.execute(context=context)

def get_from_date():
    return (datetime.now() - timedelta(days=AGG_PERIOD)).strftime('%Y%m%d')

def get_to_date():
    return datetime.now().strftime('%Y%m%d')

default_args = {
    'owner': 'search',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 21),
    'retries': 1,
    'retry_delay': timedelta(hours=1),
    'on_fail_callback': task_fail_slack_alert,
}

default_conf = {
    'spark.network.timeout': '800s',
    'spark.executor.heartbeatInterval': '60s'
}

dag = DAG('iac_keyword_stat_day', catchup=False, default_args=default_args, schedule_interval="0 6 * * *")

t1 = SparkSubmitOperator(
    task_id='spark_submit',
    application=JAR_FILE,
    conf=default_conf,
    java_class=EXECUTE_CLASS,
    executor_cores=EXECUTOR_CORES,
    executor_memory=EXECUTOR_MEMORY,
    num_executors=NUM_EXECUTOR,
    application_args=["--from", get_from_date(), "--to", get_to_date()],
    dag=dag)

但它却造成了以下气流错误。

{__init__.py:1580} ERROR - Cannot execute: ['spark-submit', '--master', 'yarn', '--conf', 'spark.executor.heartbeatInterval=60s', '--conf', 'spark.network.timeout=800s', '--num-executors', '56', '--executor-cores', '5', '--executor-memory', '20g', '--name', u'airflow-spark', '--class', 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay', '--queue', u'default', u'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar', u'--from', u'20190624', u'--to', u'20190625']. Error code is: 1.
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/spark_submit_operator.py", line 176, in execute
    self._hook.submit(self._application)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", line 352, in submit
    spark_submit_cmd, returncode
AirflowException: Cannot execute: ['spark-submit', '--master', 'yarn', '--conf', 'spark.executor.heartbeatInterval=60s', '--conf', 'spark.network.timeout=800s', '--num-executors', '56', '--executor-cores', '5', '--executor-memory', '20g', '--name', u'airflow-spark', '--class', 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay', '--queue', u'default', u'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar', u'--from', u'20190624', u'--to', u'20190625']. Error code is: 1.

我如何解决它????

相关内容