我有一个用于抓取数据的 Python 程序,需要花费大量时间才能运行。为了使其并行化,我修改了代码,以便该程序可以在不同的机器上并行运行。我还创建了一个 docker 镜像并将其推送到 Dockerhub。
我尝试使用 Airflow 和 KubernetesPodOperator 创建 10 个 Kubernetes pod 来实现这一点。但到目前为止我还没有成功,而且这方面的文档也不够详细。还有其他方法可以实现这一点吗?GCP、Spark 和 Airflow 怎么样?或者只是由 Airflow 以某种方式协调的 GCE 机器?还有其他选择吗?
答案1
我建议你看看这个线、jug 或 ray 似乎是更简单的选择。
而且这里您将找到一份相当完整的并行处理(集群计算)解决方案列表。
以下是射线示例:
import ray
ray.init()
@ray.remote
def mapping_function(input):
return input + 1
results = ray.get([mapping_function.remote(i) for i in range(100)])
ray.util.multiprocessing.pool
或者,如果你正在使用 Python 多处理,你可以使用's Pool 而不是from multiprocessing.pool
's Pool将其扩展到集群。
看看这个邮政了解详情
您可以运行的示例代码(蒙特卡洛 Pi 估计):
import math
import random
import time
def sample(num_samples):
num_inside = 0
for _ in range(num_samples):
x, y = random.uniform(-1, 1), random.uniform(-1, 1)
if math.hypot(x, y) <= 1:
num_inside += 1
return num_inside
def approximate_pi_distributed(num_samples):
from ray.util.multiprocessing.pool import Pool # NOTE: Only the import statement is changed.
pool = Pool()
start = time.time()
num_inside = 0
sample_batch_size = 100000
for result in pool.map(sample, [sample_batch_size for _ in range(num_samples//sample_batch_size)]):
num_inside += result
print("pi ~= {}".format((4*num_inside)/num_samples))
print("Finished in: {:.2f}s".format(time.time()-start))
注意:我自己也愿意在我的某个项目中实现同样的目标,我还没有尝试过,但我很快就会尝试。我会发布任何有趣的更新。不要犹豫,您也可以这样做。