Dask 是什么?
介绍
Dask 是一个开源的 Python 库,专为并行计算和大数据处理设计。它提供了与 Pandas 和 NumPy 类似的高层次接口,同时支持将计算分布到多核、集群或云环境中。Dask 通过分块(chunking)和延迟计算(lazy evaluation)技术,实现了高效的数据处理和计算加速。
如何安装?
Dask 提供多种安装方式,你可以使用 conda
,pip
或者直接从源码安装。
具体的安装命令可查看官网:How to Install Dask。
核心组件
- Dask Arrays:分块处理的多维数组,支持并行线性代数运算。
- Dask DataFrame:分块处理的表格数据,兼容 Pandas 操作(如 groupby,join)。
- Dask Delayed:装饰器 @dask.delayed,用于并行化任意 Python 函数。
- Dask Distributed:分布式调度器,提供容错、动态负载均衡和诊断工具。
我们在项目中使用的时候可以通过以下方式引用:
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.array as da
import dask.bag as db
# 实际应用中可能不需要全部引用,根据具体所需要处理的数据来定。
对于我们项目里面表格的处理,DataFrame 是比较合适的。而 Dask DataFram 和 Pandas 是完全兼容的,API 是一致的。
Load Data
import pandas as pd
df = pd.read_parquet('s3://mybucket/myfile.parquet')
df.head()
import dask.dataframe as dd
df = dd.read_parquet('s3://mybucket/myfile.*.parquet')
df.head()
Data Processing
import pandas as pd
df = df[df.value >= 0]
joined = df.merge(other, on="account")
result = joined.groupby("account").value.mean()
result
import dask.dataframe as dd
df = df[df.value >= 0]
joined = df.merge(other, on="account")
result = joined.groupby("account").value.mean()
result.compute()
这里可以注意到 Dask 是并行处理版本的 Pandas。而且 Dask 是懒计算,只有当调用 .compute()
的时候才会进行。
与 Pandas 的关键区别
操作 | Pandas | Dask |
---|---|---|
数据过滤 | 立即执行,内存中处理 | 生成任务,延迟执行,并行分块 |
表合并 | 单机内存 | 分布式 Shuffle |
分组聚合 | 直接计算全局结果 | 局部聚合 + 全局合并 |
执行时机 | 立即执行 | 需要调用 .compute() 或者 .persist() 触发计算 |
示例
data_clean.py
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import numpy as np
import pandas as pd
# ----------------------------
# 步骤 1: 启动分布式集群(本地模拟)
# ----------------------------
# 创建一个本地集群(实际部署时替换为真实集群地址)
cluster = LocalCluster(
n_workers=4, # 4个工作进程
threads_per_worker=2, # 每个进程2个线程
memory_limit="4GB" # 每个进程内存限制
)
client = Client(cluster) # 连接到集群
# 打印集群信息
print(f"Dask Dashboard: {client.dashboard_link}") # 监控任务执行
# ----------------------------
# 步骤 2: 生成模拟数据(实际场景从 HDFS/S3 读取)
# ----------------------------
def generate_data():
# 创建分布式 CSV 文件(模拟10个分块)
for i in range(10):
users = np.random.choice([f"user_{x}" for x in range(1000)], size=10000)
amounts = np.round(np.random.normal(100, 50, 10000), 2)
amounts = np.where(amounts < 0, 0, amounts) # 生成少量负值
df = pd.DataFrame({"user": users, "amount": amounts})
df.to_csv(f"./data/transactions_{i}.csv", index=False)
generate_data() # 生成测试数据
# ----------------------------
# 步骤 3: 分布式数据处理
# ----------------------------
# 从分布式存储读取数据(此处模拟本地文件)
ddf = dd.read_csv("./data/transactions_*.csv")
# 过滤异常交易(惰性操作)
ddf_filtered = ddf[ddf["amount"] > 0]
# 计算每个用户的总金额
result = ddf_filtered.groupby("user")["amount"].sum()
# ----------------------------
# 步骤 4: 触发计算并持久化
# ----------------------------
# 将过滤后的数据缓存在集群内存中(加速后续操作)
ddf_filtered = client.persist(ddf_filtered) # 分布式缓存
# 计算最终结果并保存到分布式存储(此处模拟输出到本地 CSV)
result = result.compute() # 触发计算
result.to_csv("./output/user_total_amounts.csv", index_label="user")
# 关闭集群(实际生产环境通常长期运行)
client.close()
cluster.close()
install_plugin.py
from dask.distributed import PipInstall
from dask.dataframe import read_sql_table
from dask.distributed import Client
# 连接 Dask 集群
client = Client("tcp://dask-scheduler:8786")
plugin = PipInstall(packages=["sqlalchemy", "psycopg2-binary"], pip_options=["--upgrade"])
client.register_plugin(plugin)
# 分布式读取表数据(按 id 列分块)
ddf = read_sql_table(
"电视剧",
"postgresql://iger:ige@192.168.45.6:55462/postgres",
schema="chinese",
index_col="drama_id", # 用于分区的列(必须有索引)
npartitions=10, # 分区数(建议与CPU核心数对齐)
)
# 查看前 5 行(触发实际查询)
print(ddf.head())
贡献者
更新日志
2025/4/17 02:23
查看所有更新日志
081a2
-feat(docs): add new article于