本文翻译整理自:https://py.iceberg.apache.org/
PyIceberg 入门指南
PyIceberg 是一个用于访问 Iceberg 表的 Python 实现,无需依赖 JVM 环境。
相关链接资源
- Github:https://github.com/apache/iceberg-python
- 官方文档:https://py.iceberg.apache.org/
- Iceberg 社区:https://iceberg.apache.org/community/
- 技术规范:https://iceberg.apache.org/spec/
安装
在安装 PyIceberg 之前,请确保您使用的是最新版本的 pip
:
pip install --upgrade pip
你可以从 PyPI 安装最新发布版本:
pip install "pyiceberg[s3fs,hive]"
您可以根据需求混合搭配可选依赖项:
键值 | 描述 |
---|---|
hive | 支持Hive元存储 |
hive-kerberos | 支持Kerberos环境下的Hive元存储 |
glue | 支持AWS Glue |
dynamodb | 支持AWS DynamoDB |
sql-postgres | 支持Postgresql作为后端的SQL目录 |
sql-sqlite | 支持SQLite作为后端的SQL目录 |
pyarrow | 使用PyArrow作为FileIO实现与对象存储交互 |
pandas | 同时安装PyArrow和Pandas |
duckdb | 同时安装PyArrow和DuckDB |
ray | 同时安装PyArrow、Pandas和Ray |
daft | 安装Daft |
polars | 安装Polars |
s3fs | 使用S3FS作为FileIO实现与对象存储交互 |
adlfs | 使用ADLFS作为FileIO实现与对象存储交互 |
snappy | 支持snappy Avro压缩 |
gcsfs | 使用GCSFS作为FileIO实现与对象存储交互 |
rest-sigv4 | 支持为REST目录生成AWS SIGv4认证头 |
您需要安装s3fs
、adlfs
、gcsfs
或pyarrow
才能从对象存储获取文件。
连接到目录
Iceberg利用目录作为组织表的集中管理位置。这可以是传统的Hive目录(用于将Iceberg表与其他表一起存储)、供应商解决方案(如AWS Glue目录),或是Iceberg自身REST协议的实现。请查看配置页面获取所有配置细节。
出于演示目的,我们将配置目录使用SqlCatalog
实现,该实现会将信息存储在本地sqlite
数据库中。同时我们会将目录配置为在本地文件系统而非对象存储中存储数据文件。由于可扩展性有限,此配置不应在生产环境中使用。
为Iceberg创建临时存储位置:
mkdir /tmp/warehouse
打开 Python 3 REPL 来设置目录:
from pyiceberg.catalog import load_catalog
warehouse_path = "/tmp/warehouse"
catalog = load_catalog(
"default",
**{
'type': 'sql',
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
},
)
sql
目录适用于本地测试,无需额外服务。如需尝试其他目录,请参阅配置说明。
写入PyArrow数据框
我们以出租车数据集为例,将其写入Iceberg表。
首先下载一个月的数据:
curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -o /tmp/yellow_tripdata_2023-01.parquet
将其加载到您的 PyArrow 数据框中:
import pyarrow.parquet as pq
df = pq.read_table("/tmp/yellow_tripdata_2023-01.parquet")
创建新的 Iceberg 表:
catalog.create_namespace("default")
table = catalog.create_table(
"default.taxi_dataset",
schema=df.schema,
)
将数据框追加到表中:
table.append(df)
len(table.scan().to_arrow())
已向表中写入 3,066,766 行数据。
现在生成用于模型训练的每英里小费特征:
import pyarrow.compute as pc
df = df.append_column("tip_per_mile", pc.divide(df["tip_amount"], df["trip_distance"]))
将表结构演进为包含新列:
with table.update_schema() as update_schema:
update_schema.union_by_name(df.schema)
现在我们可以将新的数据帧写入 Iceberg 表:
table.overwrite(df)
print(table.scan().to_arrow())
新列已成功添加:
taxi_dataset(
1: VendorID: optional long,
2: tpep_pickup_datetime: optional timestamp,
3: tpep_dropoff_datetime: optional timestamp,
4: passenger_count: optional double,
5: trip_distance: optional double,
6: RatecodeID: optional double,
7: store_and_fwd_flag: optional string,
8: PULocationID: optional long,
9: DOLocationID: optional long,
10: payment_type: optional long,
11: fare_amount: optional double,
12: extra: optional double,
13: mta_tax: optional double,
14: tip_amount: optional double,
15: tolls_amount: optional double,
16: improvement_surcharge: optional double,
17: total_amount: optional double,
18: congestion_surcharge: optional double,
19: airport_fee: optional double,
20: tip_per_mile: optional double
),
我们可以看到,有2,371,784行数据具有每英里小费记录:
df = table.scan(row_filter="tip_per_mile > 0").to_arrow()
len(df)
探索Iceberg数据与元数据文件
由于目录配置为使用本地文件系统,我们可以查看Iceberg如何存储上述操作产生的数据和元数据文件。
find /tmp/warehouse/
更多详情
具体细节请查看 CLI 或 Python API 页面。
配置
配置值设置
有三种方式传入配置:
- 使用
.pyiceberg.yaml
配置文件(推荐方式) - 通过环境变量传递
- 通过CLI或Python API直接传入凭证
配置文件可以存储在以下任一位置(按优先级排序):
1、PYICEBERG_HOME
环境变量指定的目录
2、用户主目录
3、当前工作目录
如需修改 .pyiceberg.yaml
的搜索路径,可以覆盖 PYICEBERG_HOME
环境变量。
另一种方式是通过环境变量进行配置:
export PYICEBERG_CATALOG__DEFAULT__URI=thrift://localhost:9083
export PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID=username
export PYICEBERG_CATALOG__DEFAULT__S3__SECRET_ACCESS_KEY=password
Iceberg 读取的环境变量以 PYICEBERG_
开头,后接以下 yaml 结构,其中双下划线 __
表示嵌套字段,而下划线 _
会被转换为连字符 -
。
例如,PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID
会在 default
目录下设置 s3.access-key-id
。
表配置
Iceberg 表支持通过表属性来配置表的行为。
写入选项
键 | 选项 | 默认值 | 描述 |
---|---|---|---|
write.parquet.compression-codec | {uncompressed,zstd,gzip,snappy} | zstd | 设置Parquet压缩编解码器。 |
write.parquet.compression-level | 整数 | null | Parquet编解码器的压缩级别。若未设置,则由PyIceberg决定 |
write.parquet.row-group-limit | 行数 | 1048576 | 单个行组内条目数量的上限 |
write.parquet.page-size-bytes | 字节大小 | 1MB | 为列块内数据页的近似编码大小设置目标阈值 |
write.parquet.page-row-limit | 行数 | 20000 | 为列块内最大行数设置目标阈值 |
write.parquet.dict-size-bytes | 字节大小 | 2MB | 设置每个行组的字典页大小限制 |
write.metadata.previous-versions-max | 整数 | 100 | 提交后删除前保留的旧版本元数据文件最大数量。 |
write.metadata.delete-after-commit.enabled | 布尔值 | False | 是否在每次表提交后自动删除旧的已跟踪元数据文件。将保留最近的部分元数据文件,数量可通过属性write.metadata.previous-versions-max 设置。 |
write.object-storage.enabled | 布尔值 | False | 启用ObjectStoreLocationProvider ,为文件路径添加哈希组件。 |
write.object-storage.partitioned-paths | 布尔值 | True | 当启用对象存储时,控制分区值是否包含在文件路径中 |
write.py-location-provider.impl | 格式为module.ClassName 的字符串 | null | 可选项,自定义LocationProvider 实现 |
write.data.path | 指向位置的字符串 | {metadata.location}/data | 设置数据写入的位置。 |
write.metadata.path | 指向位置的字符串 | {metadata.location}/metadata | 设置元数据写入的位置。 |
表行为选项
键名 | 选项 | 默认值 | 描述 |
---|---|---|---|
commit.manifest.target-size-bytes | 字节大小 | 8388608 (8MB) | 合并清单文件时的目标大小 |
commit.manifest.min-count-to-merge | 清单数量 | 100 | 合并清单文件时的最小数量阈值 |
commit.manifest-merge.enabled | 布尔值 | False | 控制写入时是否自动合并清单文件 |
快速追加模式
与Java实现不同,PyIceberg默认采用快速追加模式,因此commit.manifest-merge.enabled
默认设置为False
。
FileIO
Iceberg采用FileIO这一可插拔模块的概念来实现文件的读取、写入和删除操作。默认情况下,PyIceberg会根据文件路径协议(如s3://
、gs://
等)自动尝试初始化匹配的FileIO实现,并优先使用已安装的第一个可用模块。支持以下协议及对应实现:
- s3, s3a, s3n:
PyArrowFileIO
,FsspecFileIO
- gs:
PyArrowFileIO
- file:
PyArrowFileIO
- hdfs:
PyArrowFileIO
- abfs, abfss:
FsspecFileIO
- oss:
PyArrowFileIO
用户也可显式指定FileIO实现:
键名 | 示例 | 说明 |
---|---|---|
py-io-impl | pyiceberg.io.fsspec.FsspecFileIO | 强制指定FileIO实现类,若无法加载则会明确报错 |
FileIO模块支持以下配置选项:
S3
键名 | 示例 | 描述 |
---|---|---|
s3.endpoint | https://10.0.19.25/ | 为FileIO配置访问S3服务的替代端点。可用于使S3FileIO兼容任何具有不同端点的S3兼容对象存储服务,或访问虚拟私有云中的私有S3端点。 |
s3.access-key-id | admin | 配置用于访问FileIO的静态访问密钥ID。 |
s3.secret-access-key | password | 配置用于访问FileIO的静态秘密访问密钥。 |
s3.session-token | AQoDYXdzEJr… | 配置用于访问FileIO的静态会话令牌。 |
s3.role-session-name | session | 为假定角色会话配置的可选标识符。 |
s3.role-arn | arn:aws:… | AWS角色ARN。如果提供此参数而非access_key和secret_key,将通过担任此角色获取临时凭证。 |
s3.signer | bearer | 配置FileIO的签名版本。 |
s3.signer.uri | http://my.signer:8080/s3 | 当远程签名URI与目录URI不同时进行配置。远程签名仅对FsspecFileIO 实现。最终请求将发送至<s3.signer.uri>/<s3.signer.endpoint> 。 |
s3.signer.endpoint | v1/main/s3-sign | 配置远程签名端点。远程签名仅对FsspecFileIO 实现。最终请求将发送至<s3.signer.uri>/<s3.signer.endpoint> (默认值:v1/aws/s3/sign)。 |
s3.region | us-west-2 | 配置用于初始化S3FileSystem 的默认区域。若未设置,PyArrowFileIO 会尝试自动解析区域(仅支持AWS S3存储桶)。 |
s3.resolve-region | False | 仅支持PyArrowFileIO ,启用时将始终尝试解析存储桶位置(仅支持AWS S3存储桶)。 |
s3.proxy-uri | http://my.proxy.com:8080 | 配置FileIO使用的代理服务器。 |
s3.connect-timeout | 60.0 | 配置套接字连接超时时间(单位:秒)。 |
s3.request-timeout | 60.0 | 在Windows和macOS上配置套接字读取超时时间(单位:秒)。 |
s3.force-virtual-addressing | False | 是否强制使用存储桶虚拟寻址。若为true,则始终启用虚拟寻址;若为false,则仅当endpoint_override为空时启用。适用于仅支持虚拟托管式访问的非AWS后端服务。 |
HDFS
键名 | 示例 | 描述 |
---|---|---|
hdfs.host | https://10.0.19.25/ | 配置要连接的HDFS主机地址 |
hdfs.port | 9000 | 配置要连接的HDFS端口号 |
hdfs.user | user | 配置用于连接的HDFS用户名 |
hdfs.kerberos_ticket | kerberos_ticket | 配置Kerberos票据缓存路径 |
Azure Data Lake
键 | 示例 | 描述 |
---|---|---|
adls.connection-string | AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqF…;BlobEndpoint=http://localhost/ | 连接字符串。可用于通过FileIO连接任何兼容ADLS的对象存储服务(如azurite),这些服务可能使用不同的端点。 |
adls.account-name | devstoreaccount1 | 要连接的账户名称 |
adls.account-key | Eby8vdM02xNOcqF… | 用于账户认证的密钥 |
adls.sas-token | NuHOuuzdQN7VRM%2FOpOeqBlawRCA845IY05h9eu1Yte4%3D | 共享访问签名 |
adls.tenant-id | ad667be4-b811-11ed-afa1-0242ac120002 | 租户ID |
adls.client-id | ad667be4-b811-11ed-afa1-0242ac120002 | 客户端ID |
adls.client-secret | oCA3R6P*ka#oa1Sms2J74z… | 客户端密钥 |
Google Cloud Storage
键名 | 示例 | 描述 |
---|---|---|
gcs.project-id | my-gcp-project | 为GCS FileIO配置Google Cloud项目ID。 |
gcs.oauth2.token | ya29.dr.AfM… | 用于临时访问的令牌字符串。 |
gcs.oauth2.token-expires-at | 1690971805918 | 配置基于访问令牌生成的凭据过期时间(自纪元起的毫秒数)。 |
gcs.access | read_only | 配置客户端访问权限,可选值为’read_only’、‘read_write’或’full_control’。 |
gcs.consistency | md5 | 配置文件写入时的校验方式,可选值为’none’、‘size’或’md5’。 |
gcs.cache-timeout | 60 | 配置对象元数据缓存的过期时间(秒)。 |
gcs.requester-pays | False | 配置是否使用请求方付费模式。 |
gcs.session-kwargs | {} | 配置传递给aiohttp.ClientSession的参数字典(可包含代理设置等)。 |
gcs.service.host | http://0.0.0.0:4443 | 配置GCS FileIO的替代访问端点(格式:协议://主机:端口)。未设置时默认使用环境变量"STORAGE_EMULATOR_HOST"的值,若该变量也未设置则使用标准Google端点。 |
gcs.default-location | US | 配置存储桶的默认创建位置(如’US’或’EUROPE-WEST3’)。 |
gcs.version-aware | False | 配置是否支持GCS存储桶的对象版本控制功能。 |
阿里云对象存储服务(OSS)
PyIceberg使用S3FileSystem类连接OSS存储桶,因为该服务兼容S3 SDK,只要端点采用虚拟托管样式地址即可。
键名 | 示例 | 描述 |
---|---|---|
s3.endpoint | https://s3.oss-your-bucket-region.aliyuncs.com/ | 为FileIO配置OSS服务的访问端点。必须使用示例中给出的S3兼容端点。 |
s3.access-key-id | admin | 配置用于访问FileIO的静态访问密钥ID。 |
s3.secret-access-key | password | 配置用于访问FileIO的静态密钥访问密码。 |
s3.session-token | AQoDYXdzEJr… | 配置用于访问FileIO的静态会话令牌。 |
s3.force-virtual-addressing | True | 是否使用存储桶的虚拟地址。必须设置为True,因为OSS只能通过虚拟托管样式地址访问。 |
PyArrow
键名 | 示例 | 描述 |
---|---|---|
pyarrow.use-large-types-on-read | True | 在表扫描时使用大型PyArrow类型,即large_string、large_binary和large_list字段类型。默认值为True。 |
位置提供器
Apache Iceberg 使用 LocationProvider
的概念来管理表数据文件的路径。在 PyIceberg 中,LocationProvider
模块设计为可插拔式,允许针对特定用例进行定制,并额外确定元数据文件的位置。表的 LocationProvider
可以通过表属性进行指定。
数据文件和元数据文件的位置均可通过配置表属性 write.data.path
和 write.metadata.path
分别进行自定义。
如需更细粒度的控制,可以重写 LocationProvider
的 new_data_location
和 new_metadata_location
方法,以定义生成文件路径的自定义逻辑。详见 加载自定义位置提供器
。
PyIceberg 默认使用 SimpleLocationProvider
来管理文件路径。
简单位置提供器
SimpleLocationProvider
提供以 {location}/data/
为前缀的路径,其中 location
来自表元数据。可以通过设置write.data.path
表配置来覆盖此默认行为。
例如,一个非分区表的数据文件可能具有如下路径:
s3://bucket/ns/table/data/0000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet
当表被分区时,给定分区下的文件会被分组到一个子目录中,该分区键和值作为目录名称——这被称为Hive风格的分区路径格式。例如,按字符串列category
分区的表可能有一个数据文件,其路径如下:
s3://bucket/ns/table/data/category=orders/0000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet
对象存储位置提供器
PyIceberg 提供了 ObjectStoreLocationProvider
以及可选的分区排除优化功能,专为存储在对象存储中的表设计。如需了解这些配置的更多背景和动机,请参阅Iceberg Java实现的文档。
当多个文件存储在相同前缀下时,S3等云对象存储通常会对前缀进行请求限流,导致性能下降。ObjectStoreLocationProvider
通过在文件路径中注入二进制目录形式的确定性哈希值来应对这一问题,从而将文件分散到更多对象存储前缀中。
路径以 {location}/data/
为前缀,其中 location
来自表元数据,其方式类似于 SimpleLocationProvider
。这可以通过设置write.data.path
表配置来覆盖。
例如,一个按字符串列 category
分区的表可能具有如下位置的数据文件:(注意额外的二进制目录)
s3://bucket/ns/table/data/0101/0110/1001/10110010/category=orders/0000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet
为表启用 ObjectStoreLocationProvider
需要显式将其 write.object-storage.enabled
表属性设置为 True
。
分区排除
当使用 ObjectStoreLocationProvider
时,表属性 write.object-storage.partitioned-paths
(默认为 True
)可设置为 False
,作为针对对象存储的额外优化。这将完全省略数据文件路径中的分区键和值,以进一步减小键大小。禁用该属性后,上述相同的数据文件将被写入以下路径:(注意缺少 category=orders
)
s3://bucket/ns/table/data/1101/0100/1011/00111010-00000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet
加载自定义位置提供器
与 FileIO 类似,可以通过具体继承抽象基类 LocationProvider
来为表提供自定义的 LocationProvider
。
需要将表属性 write.py-location-provider.impl
设置为自定义 LocationProvider
的完全限定名称(例如 mymodule.MyLocationProvider
)。请注意,LocationProvider
是按表配置的,允许为不同表提供不同的位置分配方案。还需注意,Iceberg 的 Java 实现使用不同的表属性 write.location-provider.impl
来配置自定义 Java 实现。
下面展示了一个自定义 LocationProvider
的实现示例。
import uuid
class UUIDLocationProvider(LocationProvider):
def __init__(self, table_location: str, table_properties: Properties):
super().__init__(table_location, table_properties)
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
# Can use any custom method to generate a file path given the partitioning information and file name
prefix = f"{self.table_location}/{uuid.uuid4()}"
return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if partition_key else f"{prefix}/{data_file_name}"
目录
PyIceberg 目前原生支持 REST、SQL、Hive、Glue 和 DynamoDB 类型的目录。此外,您也可以直接设置目录实现:
键名 | 示例 | 描述 |
---|---|---|
type | rest | 目录类型,可选值为 rest 、sql 、hive 、glue 、dymamodb ,默认为 rest |
py-catalog-impl | mypackage.mymodule.MyCatalog | 显式设置目录实现类,若无法加载则会明确报错 |
REST Catalog
catalog:
default:
uri: http://rest-catalog/ws/
credential: t-1234:secret
default-mtls-secured-catalog:
uri: https://rest-catalog/ws/
ssl:
client:
cert: /absolute/path/to/client.crt
key: /absolute/path/to/client.key
cabundle: /absolute/path/to/cabundle.pem
键 | 示例 | 描述 |
---|---|---|
uri | https://rest-catalog/ws | 标识REST服务器的URI |
ugi | t-1234:secret | Hive客户端使用的Hadoop UGI凭证 |
credential | t-1234:secret | 初始化目录时用于OAuth2凭证流的认证信息 |
token | FEW23.DFSDF.FSDF | 用于Authorization 头的Bearer令牌值 |
scope | openid offline corpds:ds:profile | 请求安全令牌的期望作用域(默认为catalog) |
resource | rest_catalog.iceberg.com | 目标资源或服务的URI |
audience | rest_catalog | 目标资源或服务的逻辑名称 |
rest.sigv4-enabled | true | 使用AWS SigV4协议对REST服务器请求进行签名 |
rest.signing-region | us-east-1 | 使用SigV4签名请求时指定的区域 |
rest.signing-name | execute-api | 使用SigV4签名请求时指定的服务签名名称 |
oauth2-server-uri | https://auth-service/cc | 客户端凭证认证使用的认证URL(默认为uri + ‘v1/oauth/tokens’) |
RESTCatalog中的请求头配置
要在RESTCatalog中配置自定义请求头,请在目录属性中添加以header.
为前缀的配置项。这样可以确保所有发送至REST服务的HTTP请求都包含指定的请求头。
catalog:
default:
uri: http://rest-catalog/ws/
credential: t-1234:secret
header.content-type: application/vnd.api+json
RESTCatalog规范定义的特定请求头包括:
键名 | 选项 | 默认值 | 描述 |
---|---|---|---|
header.X-Iceberg-Access-Delegation | {vended-credentials,remote-signing} | vended-credentials | 向服务器表明客户端支持通过逗号分隔的访问机制列表进行委托访问。服务器可以选择通过任意或完全不通过请求的机制来提供访问权限 |
SQL Catalog
SQL Catalog需要一个数据库作为其后端。PyIceberg通过psycopg2支持PostgreSQL和SQLite。必须使用uri
属性配置数据库连接。init_catalog_tables是可选的,默认值为True。如果设置为False,初始化SQLCatalog时将不会创建目录表。详情请参阅SQLAlchemy的URL格式文档:
对于PostgreSQL:
catalog:
default:
type: sql
uri: postgresql+psycopg2://username:password@localhost/mydatabase
init_catalog_tables: false
对于SQLite的情况:
仅限开发用途
SQLite并非为并发场景设计,建议仅将此目录用于探索性或开发目的。
catalog:
default:
type: sql
uri: sqlite:tmp/pyiceberg.db
init_catalog_tables: false
键名 | 示例 | 默认值 | 描述 |
---|---|---|---|
uri | postgresql+psycopg2://username:password@localhost/mydatabase | 目录数据库的SQLAlchemy后端URL(参见URL格式文档) | |
echo | true | false | SQLAlchemy引擎的echo参数,用于将所有语句记录到默认日志处理器 |
pool_pre_ping | true | false | SQLAlchemy引擎的pool_pre_ping参数,用于在每次检出时测试连接活性 |
内存目录
内存目录基于SqlCatalog
构建,并使用SQLite内存数据库作为其后端。
它适用于测试、演示和实验环境,但不适合生产环境,因为不支持并发访问。
catalog:
default:
type: in-memory
warehouse: /tmp/pyiceberg/warehouse
键名 | 示例 | 默认值 | 描述 |
---|---|---|---|
warehouse | /tmp/pyiceberg/warehouse | file:///tmp/iceberg/warehouse | 内存目录将存储其数据文件的目录路径。 |
Hive Catalog
catalog:
default:
uri: thrift://localhost:9083
s3.endpoint: http://localhost:9000
s3.access-key-id: admin
s3.secret-access-key: password
键名 | 示例 | 描述 |
---|---|---|
hive.hive2-compatible | true | 启用 Hive 2.x 兼容模式 |
hive.kerberos-authentication | true | 通过 Kerberos 进行认证 |
使用 Hive 2.x 时,请确保设置兼容性标志:
catalog:
default:
...
hive.hive2-compatible: true
Glue Catalog
您的AWS凭证可以直接通过Python API传递。否则,请参考如何配置AWS凭证在本地设置您的AWS账户凭证。
catalog:
default:
type: glue
glue.access-key-id: <ACCESS_KEY_ID>
glue.secret-access-key: <SECRET_ACCESS_KEY>
glue.session-token: <SESSION_TOKEN>
glue.region: <REGION_NAME>
s3.endpoint: http://localhost:9000
s3.access-key-id: admin
s3.secret-access-key: password
catalog:
default:
type: glue
glue.profile-name: <PROFILE_NAME>
glue.region: <REGION_NAME>
s3.endpoint: http://localhost:9000
s3.access-key-id: admin
s3.secret-access-key: password
客户端专属属性
glue.*
属性仅适用于 Glue Catalog。如需在 Glue Catalog 和 S3 FileIO 中使用相同的凭证,可设置 client.*
属性。详见统一AWS凭证章节。
键名 | 示例值 | 描述 |
---|---|---|
glue.id | 111111111111 | 配置Glue Catalog的12位数字ID |
glue.skip-archive | true | 配置是否跳过旧表版本的归档。默认为true |
glue.endpoint | https://glue.us-east-1.amazonaws.com | 配置GlueCatalog访问Glue服务的备用端点 |
glue.profile-name | default | 配置访问Glue Catalog使用的静态profile |
glue.region | us-east-1 | 设置Glue Catalog的区域 |
glue.access-key-id | admin | 配置访问Glue Catalog使用的静态访问密钥ID |
glue.secret-access-key | password | 配置访问Glue Catalog使用的静态密钥 |
glue.session-token | AQoDYXdzEJr… | 配置访问Glue Catalog使用的静态会话令牌 |
glue.max-retries | 10 | 配置Glue服务调用的最大重试次数 |
glue.retry-mode | standard | 配置Glue服务的重试模式。默认为standard |
已移除属性
profile_name
、region_name
、aws_access_key_id
、aws_secret_access_key
和 aws_session_token
属性已在0.8.0版本弃用并移除
DynamoDB Catalog
若需使用AWS DynamoDB作为元数据目录,可通过以下两种方式配置pyiceberg,并参考如何配置AWS凭证在本地设置AWS账户凭证。如需为DynamoDB Catalog和S3 FileIO使用相同凭证,可设置client.*
属性。
catalog:
default:
type: dynamodb
table-name: iceberg
如果您更倾向于显式传递凭据给客户端,而不是依赖环境变量,
catalog:
default:
type: dynamodb
table-name: iceberg
dynamodb.access-key-id: <ACCESS_KEY_ID>
dynamodb.secret-access-key: <SECRET_ACCESS_KEY>
dynamodb.session-token: <SESSION_TOKEN>
dynamodb.region: <REGION_NAME>
s3.endpoint: http://localhost:9000
s3.access-key-id: admin
s3.secret-access-key: password
客户端专属属性
dynamodb.*
属性仅适用于 DynamoDB Catalog。如需在 DynamoDB Catalog 和 S3 FileIO 中使用相同凭证,可设置 client.*
属性。详见统一AWS凭证章节。
键名 | 示例 | 描述 |
---|---|---|
dynamodb.profile-name | default | 配置用于访问 DynamoDB Catalog 的静态凭证配置文件 |
dynamodb.region | us-east-1 | 设置 DynamoDB Catalog 所在区域 |
dynamodb.access-key-id | admin | 配置用于访问 DynamoDB Catalog 的静态访问密钥ID |
dynamodb.secret-access-key | password | 配置用于访问 DynamoDB Catalog 的静态密钥访问密码 |
dynamodb.session-token | AQoDYXdzEJr… | 配置用于访问 DynamoDB Catalog 的静态会话令牌 |
已移除属性
profile_name
、region_name
、aws_access_key_id
、aws_secret_access_key
和 aws_session_token
属性已在 0.8.0 版本弃用并移除
自定义目录实现
如需加载任何自定义目录实现,可按如下方式设置目录配置:
catalog:
default:
py-catalog-impl: mypackage.mymodule.MyCatalog
custom-key1: value1
custom-key2: value2
统一AWS凭证配置
您可以通过配置client.*
属性来显式设置Glue/DynamoDB Catalog和S3 FileIO的AWS凭证。例如:
catalog:
default:
type: glue
client.access-key-id: <ACCESS_KEY_ID>
client.secret-access-key: <SECRET_ACCESS_KEY>
client.region: <REGION_NAME>
为Glue Catalog和S3 FileIO配置AWS凭证。
键名 | 示例 | 描述 |
---|---|---|
client.region | us-east-1 | 设置Glue/DynamoDB Catalog和S3 FileIO的区域 |
client.access-key-id | admin | 配置用于访问Glue/DynamoDB Catalog和S3 FileIO的静态访问密钥ID |
client.secret-access-key | password | 配置用于访问Glue/DynamoDB Catalog和S3 FileIO的静态秘密访问密钥 |
client.session-token | AQoDYXdzEJr… | 配置用于访问Glue/DynamoDB Catalog和S3 FileIO的静态会话令牌 |
client.role-session-name | session | 可选参数,用于标识所承担角色的会话名称 |
client.role-arn | arn:aws:… | AWS角色ARN。如果提供此参数而非access_key和secret_key,将通过承担此角色来获取临时凭证 |
属性优先级
若设置了服务特定属性,client.*
属性将被覆盖。例如,若client.region
设为us-west-1
而s3.region
设为us-east-1
,则S3 FileIO将使用us-east-1
作为区域。
并发控制
PyIceberg 采用多线程机制来并行化操作。您可以通过以下两种方式配置工作线程数量:
1、在配置文件中设置 max-workers
参数
2、通过环境变量 PYICEBERG_MAX_WORKERS
指定
默认线程数会根据系统硬件和 Python 版本自动确定,具体细节可参考 Python 官方文档。
向后兼容性
旧版Java实现(<1.4.0
)错误地将current-snapshot-id
这个可选属性假定为TableMetadata中的必填属性。这意味着如果元数据文件中缺少current-snapshot-id
(例如建表时),应用程序会因无法加载表而抛出异常。该问题在较新的Iceberg版本中已修正。但用户仍可通过配置强制PyIceberg生成兼容旧版规范的元数据文件,只需在配置文件中将legacy-current-snapshot-id
属性设为"True",或设置环境变量PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID
。更多技术细节请参阅PR讨论。
纳秒级支持
PyIceberg 当前在 TimestampType 中仅支持微秒级精度。PyArrow 时间戳类型若为 ‘s’ 或 ‘ms’ 精度,在写入时会自动向上转换为 ‘us’ 精度时间戳。如果需要,‘ns’ 精度的时间戳在写入时也可自动向下转换。可通过在配置文件中设置 downcast-ns-timestamp-to-us-on-write
属性为 “True”,或设置环境变量 PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE
来启用此功能。关于纳秒级支持的长期规划详情,请参阅纳秒时间戳提案文档
Python 命令行工具
PyIceberg 提供了一个 CLI 工具,安装 pyiceberg
包后即可使用。
虽然可以通过 --uri
和 --credential
参数传递 Catalog 路径,但建议按照 Catalog 章节所述配置 ~/.pyiceberg.yaml
文件。
➜ pyiceberg --help
Usage: pyiceberg [OPTIONS] COMMAND [ARGS]...
Options:
--catalog TEXT
--verbose BOOLEAN
--output [text|json]
--ugi TEXT
--uri TEXT
--credential TEXT
--help Show this message and exit.
Commands:
describe Describes a namespace xor table
drop Operations to drop a namespace or table
list Lists tables or namespaces
location Returns the location of the table
properties Properties on tables/namespaces
rename Renames a table
schema Gets the schema of the table
spec Returns the partition spec of the table
uuid Returns the UUID of the table
这个示例假设您已设置了默认目录。如果想加载其他目录(例如上文中的 rest 示例),则需要设置 --catalog rest
。
➜ pyiceberg list
default
nyc
➜ pyiceberg list nyc
nyc.taxis
➜ pyiceberg describe nyc.taxis
Table format version 1
Metadata location file:/.../nyc.db/taxis/metadata/00000-aa3a3eac-ea08-4255-b890-383a64a94e42.metadata.json
Table UUID 6cdfda33-bfa3-48a7-a09e-7abb462e3460
Last Updated 1661783158061
Partition spec []
Sort order []
Current schema Schema, id=0
├── 1: VendorID: optional long
├── 2: tpep_pickup_datetime: optional timestamptz
├── 3: tpep_dropoff_datetime: optional timestamptz
├── 4: passenger_count: optional double
├── 5: trip_distance: optional double
├── 6: RatecodeID: optional double
├── 7: store_and_fwd_flag: optional string
├── 8: PULocationID: optional long
├── 9: DOLocationID: optional long
├── 10: payment_type: optional long
├── 11: fare_amount: optional double
├── 12: extra: optional double
├── 13: mta_tax: optional double
├── 14: tip_amount: optional double
├── 15: tolls_amount: optional double
├── 16: improvement_surcharge: optional double
├── 17: total_amount: optional double
├── 18: congestion_surcharge: optional double
└── 19: airport_fee: optional double
Current snapshot Operation.APPEND: id=5937117119577207079, schema_id=0
Snapshots Snapshots
└── Snapshot 5937117119577207079, schema 0: file:/.../nyc.db/taxis/metadata/snap-5937117119577207079-1-94656c4f-4c66-4600-a4ca-f30377300527.avro
Properties owner root
write.format.default parquet
或者输出为JSON格式以便自动化处理:
➜ pyiceberg --output json describe nyc.taxis | jq
{
"identifier": [
"nyc",
"taxis"
],
"metadata_location": "file:/.../nyc.db/taxis/metadata/00000-aa3a3eac-ea08-4255-b890-383a64a94e42.metadata.json",
"metadata": {
"location": "file:/.../nyc.db/taxis",
"table-uuid": "6cdfda33-bfa3-48a7-a09e-7abb462e3460",
"last-updated-ms": 1661783158061,
"last-column-id": 19,
"schemas": [
{
"type": "struct",
"fields": [
{
"id": 1,
"name": "VendorID",
"type": "long",
"required": false
},
...
{
"id": 19,
"name": "airport_fee",
"type": "double",
"required": false
}
],
"schema-id": 0,
"identifier-field-ids": []
}
],
"current-schema-id": 0,
"partition-specs": [
{
"spec-id": 0,
"fields": []
}
],
"default-spec-id": 0,
"last-partition-id": 999,
"properties": {
"owner": "root",
"write.format.default": "parquet"
},
"current-snapshot-id": 5937117119577207000,
"snapshots": [
{
"snapshot-id": 5937117119577207000,
"timestamp-ms": 1661783158061,
"manifest-list": "file:/.../nyc.db/taxis/metadata/snap-5937117119577207079-1-94656c4f-4c66-4600-a4ca-f30377300527.avro",
"summary": {
"operation": "append",
"spark.app.id": "local-1661783139151",
"added-data-files": "1",
"added-records": "2979431",
"added-files-size": "46600777",
"changed-partition-count": "1",
"total-records": "2979431",
"total-files-size": "46600777",
"total-data-files": "1",
"total-delete-files": "0",
"total-position-deletes": "0",
"total-equality-deletes": "0"
},
"schema-id": 0
}
],
"snapshot-log": [
{
"snapshot-id": "5937117119577207079",
"timestamp-ms": 1661783158061
}
],
"metadata-log": [],
"sort-orders": [
{
"order-id": 0,
"fields": []
}
],
"default-sort-order-id": 0,
"refs": {
"main": {
"snapshot-id": 5937117119577207000,
"type": "branch"
}
},
"format-version": 1,
"schema": {
"type": "struct",
"fields": [
{
"id": 1,
"name": "VendorID",
"type": "long",
"required": false
},
...
{
"id": 19,
"name": "airport_fee",
"type": "double",
"required": false
}
],
"schema-id": 0,
"identifier-field-ids": []
},
"partition-spec": []
}
}
Python API
PyIceberg 围绕目录(catalog)机制来加载表。第一步是实例化一个用于加载表的目录。我们使用以下配置来定义一个名为 prod
的目录:
catalog:
prod:
uri: http://rest-catalog/ws/
credential: t-1234:secret
请注意,可以在同一个 .pyiceberg.yaml
文件中定义多个目录。
catalog:
hive:
uri: thrift://127.0.0.1:9083
s3.endpoint: http://127.0.0.1:9000
s3.access-key-id: admin
s3.secret-access-key: password
rest:
uri: https://rest-server:8181/
warehouse: my-warehouse
通过调用 load_catalog(name="hive")
和 load_catalog(name="rest")
在 Python 中加载。
这些配置信息必须放置在名为 .pyiceberg.yaml
的文件中,该文件可以位于以下目录之一:
$HOME
或%USERPROFILE%
目录(根据操作系统是 Unix 类或 Windows 类而定)- 当前工作目录
$PYICEBERG_HOME
目录(如果设置了对应的环境变量)
有关配置选项的更多细节,请参阅专用页面。
然后加载名为 prod
的 catalog:
from pyiceberg.catalog import load_catalog
catalog = load_catalog(
"docs",
**{
"uri": "http://127.0.0.1:8181",
"s3.endpoint": "http://127.0.0.1:9000",
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
}
)
让我们创建一个命名空间:
catalog.create_namespace("docs_example")
然后列出它们:
ns = catalog.list_namespaces()
assert ns == [("docs_example",)]
然后列出命名空间中的表:
catalog.list_tables("docs_example")
创建表
要通过目录创建表:
from pyiceberg.schema import Schema
from pyiceberg.types import (
TimestampType,
FloatType,
DoubleType,
StringType,
NestedField,
StructType,
)
schema = Schema(
NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),
NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
NestedField(
field_id=5,
name="details",
field_type=StructType(
NestedField(
field_id=4, name="created_by", field_type=StringType(), required=False
),
),
required=False,
),
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
partition_spec = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
)
)
from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform
# Sort on the symbol
sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
catalog.create_table(
identifier="docs_example.bids",
schema=schema,
location="s3://pyiceberg",
partition_spec=partition_spec,
sort_order=sort_order,
)
创建表时,模式中的所有ID都会被重新分配以确保唯一性。
要通过pyarrow模式创建表:
import pyarrow as pa
schema = pa.schema(
[
pa.field("foo", pa.string(), nullable=True),
pa.field("bar", pa.int32(), nullable=False),
pa.field("baz", pa.bool_(), nullable=True),
]
)
catalog.create_table(
identifier="docs_example.bids",
schema=schema,
)
要以事务方式原子化地创建表并进行后续修改:
with catalog.create_table_transaction(
identifier="docs_example.bids",
schema=schema,
location="s3://pyiceberg",
partition_spec=partition_spec,
sort_order=sort_order,
) as txn:
with txn.update_schema() as update_schema:
update_schema.add_column(path="new_column", field_type=StringType())
with txn.update_spec() as update_spec:
update_spec.add_identity("symbol")
txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
加载表
目录表
加载 bids
表:
table = catalog.load_table("docs_example.bids")
# Equivalent to:
table = catalog.load_table(("docs_example", "bids"))
# The tuple syntax can be used if the namespace or table contains a dot.
这将返回一个表示 Iceberg 表的 Table
对象,该表可被查询和修改。
静态表
要直接从元数据文件加载表(即不使用目录),可以按如下方式使用 StaticTable
:
from pyiceberg.table import StaticTable
static_table = StaticTable.from_metadata(
"s3://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
)
静态表被视为只读。
检查表是否存在
要检查 bids
表是否存在:
catalog.table_exists("docs_example.bids")
如果表已存在,则返回 True
。
写入支持
从 PyIceberg 0.6.0 版本开始,通过 Arrow 实现了写入支持功能。让我们来看一个 Arrow 表的示例:
import pyarrow as pa
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
{"city": "Paris", "lat": 48.864716, "long": 2.349014},
],
)
接下来,根据该模式创建表:
from pyiceberg.catalog import load_catalog
catalog = load_catalog("default")
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType
schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
)
tbl = catalog.create_table("default.cities", schema=schema)
现在将数据写入表:
快速追加
PyIceberg 默认采用快速追加方式以最小化写入数据量。这种方式能实现快速写入,降低冲突可能性。快速追加的缺点是会比普通提交产生更多元数据。计划实现压缩功能,当达到阈值时将自动重写所有元数据,以保持读取性能。
tbl.append(df)
# or
tbl.overwrite(df)
数据被写入表中,当使用 tbl.scan().to_arrow()
读取表时:
pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten","Paris"]]
lat: [[52.371807,37.773972,53.11254,48.864716]]
long: [[4.896029,-122.431297,6.0989,2.349014]]
由于目前还没有数据,你们都可以使用 append(df)
或 overwrite(df)
。如果想添加更多数据,可以再次使用 .append()
:
df = pa.Table.from_pylist(
[{"city": "Groningen", "lat": 53.21917, "long": 6.56667}],
)
tbl.append(df)
读取表 tbl.scan().to_arrow()
时,可以看到 Groningen
现在也包含在表中:
pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten","Paris"],["Groningen"]]
lat: [[52.371807,37.773972,53.11254,48.864716],[53.21917]]
long: [[4.896029,-122.431297,6.0989,2.349014],[6.56667]]
嵌套列表表示不同的 Arrow 缓冲区,其中首次写入会生成一个缓冲区,而第二次追加操作会写入另一个独立缓冲区。这是预期行为,因为系统需要读取两个 Parquet 文件。
为避免写入过程中出现类型错误,您可以使用 Iceberg 表模式来强制指定 PyArrow 表的类型:
from pyiceberg.catalog import load_catalog
import pyarrow as pa
catalog = load_catalog("default")
table = catalog.load_table("default.cities")
schema = table.schema().as_arrow()
df = pa.Table.from_pylist(
[{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], schema=schema
)
table.append(df)
您可以通过调用 tbl.delete()
并指定所需的 delete_filter
来删除表中的部分数据。
tbl.delete(delete_filter="city == 'Paris'")
在上面的例子中,所有城市字段值等于Paris
的记录都将被删除。运行tbl.scan().to_arrow()
现在会返回:
pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]]
lat: [[52.371807,37.773972,53.11254],[53.21917]]
long: [[4.896029,-122.431297,6.0989],[6.56667]]
部分覆盖
使用 overwrite
API 时,可以通过 overwrite_filter
先删除表中符合过滤条件的数据,然后再追加新数据到表中。
例如,对于以下方式创建的 Iceberg 表:
from pyiceberg.catalog import load_catalog
catalog = load_catalog("default")
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType
schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
)
tbl = catalog.create_table("default.cities", schema=schema)
随着初始数据填充到表中:
import pyarrow as pa
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
{"city": "Paris", "lat": 48.864716, "long": 2.349014},
],
)
tbl.append(df)
你可以用 New York
的记录覆盖 Paris
的记录:
from pyiceberg.expressions import EqualTo
df = pa.Table.from_pylist(
[
{"city": "New York", "lat": 40.7128, "long": 74.0060},
]
)
tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris"))
这将通过 tbl.scan().to_arrow()
产生如下结果:
pyarrow.Table
city: large_string
lat: double
long: double
----
city: [["New York"],["Amsterdam","San Francisco","Drachten"]]
lat: [[40.7128],[52.371807,37.773972,53.11254]]
long: [[74.006],[4.896029,-122.431297,6.0989]]
如果 PyIceberg 表已分区,您可以使用 tbl.dynamic_partition_overwrite(df)
来替换现有分区为数据框中提供的新分区。系统会自动从提供的 Arrow 表中检测需要替换的分区。例如,对于一个在 "city"
字段上指定了分区的 Iceberg 表:
from pyiceberg.schema import Schema
from pyiceberg.types import DoubleType, NestedField, StringType
schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
)
tbl = catalog.create_table(
"default.cities",
schema=schema,
partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="city_identity"))
)
我们想要覆盖 "Paris"
分区的数据:
import pyarrow as pa
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
{"city": "Paris", "lat": -48.864716, "long": -2.349014},
],
)
tbl.append(df)
然后我们可以用这个 arrow 表调用 dynamic_partition_overwrite
:
df_corrected = pa.Table.from_pylist([
{"city": "Paris", "lat": 48.864716, "long": 2.349014}
])
tbl.dynamic_partition_overwrite(df_corrected)
这将通过 tbl.scan().to_arrow()
产生以下结果:
pyarrow.Table
city: large_string
lat: double
long: double
----
city: [["Paris"],["Amsterdam"],["Drachten"],["San Francisco"]]
lat: [[48.864716],[52.371807],[53.11254],[37.773972]]
long: [[2.349014],[4.896029],[6.0989],[-122.431297]]
Upsert
PyIceberg 支持 upsert 操作,这意味着它能够将 Arrow 表合并到 Iceberg 表中。系统会根据标识字段来判断行是否相同。如果表中已存在某行数据,则会更新该行;如果找不到匹配的行,则会插入新行。
假设有以下包含数据的表:
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType, NestedField, StringType
import pyarrow as pa
schema = Schema(
NestedField(1, "city", StringType(), required=True),
NestedField(2, "inhabitants", IntegerType(), required=True),
# Mark City as the identifier field, also known as the primary-key
identifier_field_ids=[1]
)
tbl = catalog.create_table("default.cities", schema=schema)
arrow_schema = pa.schema(
[
pa.field("city", pa.string(), nullable=False),
pa.field("inhabitants", pa.int32(), nullable=False),
]
)
# Write some data
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "inhabitants": 921402},
{"city": "San Francisco", "inhabitants": 808988},
{"city": "Drachten", "inhabitants": 45019},
{"city": "Paris", "inhabitants": 2103000},
],
schema=arrow_schema
)
tbl.append(df)
接下来,我们将向 Iceberg 表执行 upsert 操作:
df = pa.Table.from_pylist(
[
# Will be updated, the inhabitants has been updated
{"city": "Drachten", "inhabitants": 45505},
# New row, will be inserted
{"city": "Berlin", "inhabitants": 3432000},
# Ignored, already exists in the table
{"city": "Paris", "inhabitants": 2103000},
],
schema=arrow_schema
)
upd = tbl.upsert(df)
assert upd.rows_updated == 1
assert upd.rows_inserted == 1
PyIceberg 会自动检测哪些行需要更新、插入或可以直接忽略。
检查表结构
可以通过检查表来查看表的元数据信息。
时间旅行功能
要使用时间旅行功能检查表的元数据,请调用inspect table
方法并传入snapshot_id
参数。该功能支持除snapshots
和refs
之外的所有元数据表。
table.inspect.entries(snapshot_id=805611270568163028)
快照
检查表的快照:
table.inspect.snapshots()
pyarrow.Table
committed_at: timestamp[ms] not null
snapshot_id: int64 not null
parent_id: int64
operation: string
manifest_list: string not null
summary: map<string, string>
child 0, entries: struct<key: string not null, value: string> not null
child 0, key: string not null
child 1, value: string
----
committed_at: [[2024-03-15 15:01:25.682,2024-03-15 15:01:25.730,2024-03-15 15:01:25.772]]
snapshot_id: [[805611270568163028,3679426539959220963,5588071473139865870]]
parent_id: [[null,805611270568163028,3679426539959220963]]
operation: [["append","overwrite","append"]]
manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap-805611270568163028-0-43637daf-ea4b-4ceb-b096-a60c25481eb5.avro","s3://warehouse/default/table_metadata_snapshots/metadata/snap-3679426539959220963-0-8be81019-adf1-4bb6-a127-e15217bd50b3.avro","s3://warehouse/default/table_metadata_snapshots/metadata/snap-5588071473139865870-0-1382dd7e-5fbc-4c51-9776-a832d7d0984e.avro"]]
summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]]
分区
查看表的分区情况:
table.inspect.partitions()
pyarrow.Table
partition: struct<dt_month: int32, dt_day: date32[day]> not null
child 0, dt_month: int32
child 1, dt_day: date32[day]
spec_id: int32 not null
record_count: int64 not null
file_count: int32 not null
total_data_file_size_in_bytes: int64 not null
position_delete_record_count: int64 not null
position_delete_file_count: int32 not null
equality_delete_record_count: int64 not null
equality_delete_file_count: int32 not null
last_updated_at: timestamp[ms]
last_updated_snapshot_id: int64
----
partition: [
-- is_valid: all not null
-- child 0 type: int32
[null,null,612]
-- child 1 type: date32[day]
[null,2021-02-01,null]]
spec_id: [[2,1,0]]
record_count: [[1,1,2]]
file_count: [[1,1,2]]
total_data_file_size_in_bytes: [[641,641,1260]]
position_delete_record_count: [[0,0,0]]
position_delete_file_count: [[0,0,0]]
equality_delete_record_count: [[0,0,0]]
equality_delete_file_count: [[0,0,0]]
last_updated_at: [[2024-04-13 18:59:35.981,2024-04-13 18:59:35.465,2024-04-13 18:59:35.003]]
条目
用于显示表中当前数据文件和删除文件的所有清单条目。
table.inspect.entries()
pyarrow.Table
status: int8 not null
snapshot_id: int64 not null
sequence_number: int64 not null
file_sequence_number: int64 not null
data_file: struct<content: int8 not null, file_path: string not null, file_format: string not null, partition: struct<> not null, record_count: int64 not null, file_size_in_bytes: int64 not null, column_sizes: map<int32, int64>, value_counts: map<int32, int64>, null_value_counts: map<int32, int64>, nan_value_counts: map<int32, int64>, lower_bounds: map<int32, binary>, upper_bounds: map<int32, binary>, key_metadata: binary, split_offsets: list<item: int64>, equality_ids: list<item: int32>, sort_order_id: int32> not null
child 0, content: int8 not null
child 1, file_path: string not null
child 2, file_format: string not null
child 3, partition: struct<> not null
child 4, record_count: int64 not null
child 5, file_size_in_bytes: int64 not null
child 6, column_sizes: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
child 7, value_counts: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
child 8, null_value_counts: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
child 9, nan_value_counts: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
child 10, lower_bounds: map<int32, binary>
child 0, entries: struct<key: int32 not null, value: binary> not null
child 0, key: int32 not null
child 1, value: binary
child 11, upper_bounds: map<int32, binary>
child 0, entries: struct<key: int32 not null, value: binary> not null
child 0, key: int32 not null
child 1, value: binary
child 12, key_metadata: binary
child 13, split_offsets: list<item: int64>
child 0, item: int64
child 14, equality_ids: list<item: int32>
child 0, item: int32
child 15, sort_order_id: int32
readable_metrics: struct<city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null>
child 0, city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null
child 0, column_size: int64
child 1, value_count: int64
child 2, null_value_count: int64
child 3, nan_value_count: int64
child 4, lower_bound: string
child 5, upper_bound: string
child 1, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
child 0, column_size: int64
child 1, value_count: int64
child 2, null_value_count: int64
child 3, nan_value_count: int64
child 4, lower_bound: double
child 5, upper_bound: double
child 2, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
child 0, column_size: int64
child 1, value_count: int64
child 2, null_value_count: int64
child 3, nan_value_count: int64
child 4, lower_bound: double
child 5, upper_bound: double
----
status: [[1]]
snapshot_id: [[6245626162224016531]]
sequence_number: [[1]]
file_sequence_number: [[1]]
data_file: [
-- is_valid: all not null
-- child 0 type: int8
[0]
-- child 1 type: string
["s3://warehouse/default/cities/data/00000-0-80766b66-e558-4150-a5cf-85e4c609b9fe.parquet"]
-- child 2 type: string
["PARQUET"]
-- child 3 type: struct<>
-- is_valid: all not null
-- child 4 type: int64
[4]
-- child 5 type: int64
[1656]
-- child 6 type: map<int32, int64>
[keys:[1,2,3]values:[140,135,135]]
-- child 7 type: map<int32, int64>
[keys:[1,2,3]values:[4,4,4]]
-- child 8 type: map<int32, int64>
[keys:[1,2,3]values:[0,0,0]]
-- child 9 type: map<int32, int64>
[keys:[]values:[]]
-- child 10 type: map<int32, binary>
[keys:[1,2,3]values:[416D7374657264616D,8602B68311E34240,3A77BB5E9A9B5EC0]]
-- child 11 type: map<int32, binary>
[keys:[1,2,3]values:[53616E204672616E636973636F,F5BEF1B5678E4A40,304CA60A46651840]]
-- child 12 type: binary
[null]
-- child 13 type: list<item: int64>
[[4]]
-- child 14 type: list<item: int32>
[null]
-- child 15 type: int32
[null]]
readable_metrics: [
-- is_valid: all not null
-- child 0 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string>
-- is_valid: all not null
-- child 0 type: int64
[140]
-- child 1 type: int64
[4]
-- child 2 type: int64
[0]
-- child 3 type: int64
[null]
-- child 4 type: string
["Amsterdam"]
-- child 5 type: string
["San Francisco"]
-- child 1 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
-- is_valid: all not null
-- child 0 type: int64
[135]
-- child 1 type: int64
[4]
-- child 2 type: int64
[0]
-- child 3 type: int64
[null]
-- child 4 type: double
[37.773972]
-- child 5 type: double
[53.11254]
-- child 2 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
-- is_valid: all not null
-- child 0 type: int64
[135]
-- child 1 type: int64
[4]
-- child 2 type: int64
[0]
-- child 3 type: int64
[null]
-- child 4 type: double
[-122.431297]
-- child 5 type: double
[6.0989]]
引用
用于显示数据表已知的快照引用:
table.inspect.refs()
pyarrow.Table
name: string not null
type: string not null
snapshot_id: int64 not null
max_reference_age_in_ms: int64
min_snapshots_to_keep: int32
max_snapshot_age_in_ms: int64
----
name: [["main","testTag"]]
type: [["BRANCH","TAG"]]
snapshot_id: [[2278002651076891950,2278002651076891950]]
max_reference_age_in_ms: [[null,604800000]]
min_snapshots_to_keep: [[null,10]]
max_snapshot_age_in_ms: [[null,604800000]]
清单文件
要显示表的当前文件清单:
table.inspect.manifests()
pyarrow.Table
content: int8 not null
path: string not null
length: int64 not null
partition_spec_id: int32 not null
added_snapshot_id: int64 not null
added_data_files_count: int32 not null
existing_data_files_count: int32 not null
deleted_data_files_count: int32 not null
added_delete_files_count: int32 not null
existing_delete_files_count: int32 not null
deleted_delete_files_count: int32 not null
partition_summaries: list<item: struct<contains_null: bool not null, contains_nan: bool, lower_bound: string, upper_bound: string>> not null
child 0, item: struct<contains_null: bool not null, contains_nan: bool, lower_bound: string, upper_bound: string>
child 0, contains_null: bool not null
child 1, contains_nan: bool
child 2, lower_bound: string
child 3, upper_bound: string
----
content: [[0]]
path: [["s3://warehouse/default/table_metadata_manifests/metadata/3bf5b4c6-a7a4-4b43-a6ce-ca2b4887945a-m0.avro"]]
length: [[6886]]
partition_spec_id: [[0]]
added_snapshot_id: [[3815834705531553721]]
added_data_files_count: [[1]]
existing_data_files_count: [[0]]
deleted_data_files_count: [[0]]
added_delete_files_count: [[0]]
existing_delete_files_count: [[0]]
deleted_delete_files_count: [[0]]
partition_summaries: [[ -- is_valid: all not null
-- child 0 type: bool
[false]
-- child 1 type: bool
[false]
-- child 2 type: string
["test"]
-- child 3 type: string
["test"]]]
元数据日志条目
用于显示表元数据日志条目:
table.inspect.metadata_log_entries()
pyarrow.Table
timestamp: timestamp[ms] not null
file: string not null
latest_snapshot_id: int64
latest_schema_id: int32
latest_sequence_number: int64
----
timestamp: [[2024-04-28 17:03:00.214,2024-04-28 17:03:00.352,2024-04-28 17:03:00.445,2024-04-28 17:03:00.498]]
file: [["s3://warehouse/default/table_metadata_log_entries/metadata/00000-0b3b643b-0f3a-4787-83ad-601ba57b7319.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00001-f74e4b2c-0f89-4f55-822d-23d099fd7d54.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00002-97e31507-e4d9-4438-aff1-3c0c5304d271.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00003-6c8b7033-6ad8-4fe4-b64d-d70381aeaddc.metadata.json"]]
latest_snapshot_id: [[null,3958871664825505738,1289234307021405706,7640277914614648349]]
latest_schema_id: [[null,0,0,0]]
latest_sequence_number: [[null,0,0,0]]
历史
要查看表的历史记录:
table.inspect.history()
pyarrow.Table
made_current_at: timestamp[ms] not null
snapshot_id: int64 not null
parent_id: int64
is_current_ancestor: bool not null
----
made_current_at: [[2024-06-18 16:17:48.768,2024-06-18 16:17:49.240,2024-06-18 16:17:49.343,2024-06-18 16:17:49.511]]
snapshot_id: [[4358109269873137077,3380769165026943338,4358109269873137077,3089420140651211776]]
parent_id: [[null,4358109269873137077,null,4358109269873137077]]
is_current_ancestor: [[true,false,true,true]]
文件
查看表当前快照中的数据文件:
table.inspect.files()
pyarrow.Table
content: int8 not null
file_path: string not null
file_format: dictionary<values=string, indices=int32, ordered=0> not null
spec_id: int32 not null
record_count: int64 not null
file_size_in_bytes: int64 not null
column_sizes: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
value_counts: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
null_value_counts: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
nan_value_counts: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
lower_bounds: map<int32, binary>
child 0, entries: struct<key: int32 not null, value: binary> not null
child 0, key: int32 not null
child 1, value: binary
upper_bounds: map<int32, binary>
child 0, entries: struct<key: int32 not null, value: binary> not null
child 0, key: int32 not null
child 1, value: binary
key_metadata: binary
split_offsets: list<item: int64>
child 0, item: int64
equality_ids: list<item: int32>
child 0, item: int32
sort_order_id: int32
readable_metrics: struct<city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: large_string, upper_bound: large_string> not null, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null>
child 0, city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null
child 0, column_size: int64
child 1, value_count: int64
child 2, null_value_count: int64
child 3, nan_value_count: int64
child 4, lower_bound: large_string
child 5, upper_bound: large_string
child 1, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
child 0, column_size: int64
child 1, value_count: int64
child 2, null_value_count: int64
child 3, nan_value_count: int64
child 4, lower_bound: double
child 5, upper_bound: double
child 2, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
child 0, column_size: int64
child 1, value_count: int64
child 2, null_value_count: int64
child 3, nan_value_count: int64
child 4, lower_bound: double
child 5, upper_bound: double
----
content: [[0,0]]
file_path: [["s3://warehouse/default/table_metadata_files/data/00000-0-9ea7d222-6457-467f-bad5-6fb125c9aa5f.parquet","s3://warehouse/default/table_metadata_files/data/00000-0-afa8893c-de71-4710-97c9-6b01590d0c44.parquet"]]
file_format: [["PARQUET","PARQUET"]]
spec_id: [[0,0]]
record_count: [[3,3]]
file_size_in_bytes: [[5459,5459]]
column_sizes: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[49,78,128,94,118,...,118,118,94,78,109],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[49,78,128,94,118,...,118,118,94,78,109]]]
value_counts: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[3,3,3,3,3,...,3,3,3,3,3],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[3,3,3,3,3,...,3,3,3,3,3]]]
null_value_counts: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[1,1,1,1,1,...,1,1,1,1,1],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[1,1,1,1,1,...,1,1,1,1,1]]]
nan_value_counts: [[keys:[]values:[],keys:[]values:[]]]
lower_bounds: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000]]]
upper_bounds:[[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000]]]
key_metadata: [[0100,0100]]
split_offsets:[[[],[]]]
equality_ids:[[[],[]]]
sort_order_id:[[[],[]]]
readable_metrics: [
-- is_valid: all not null
-- child 0 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: large_string, upper_bound: large_string>
-- is_valid: all not null
-- child 0 type: int64
[140]
-- child 1 type: int64
[4]
-- child 2 type: int64
[0]
-- child 3 type: int64
[null]
-- child 4 type: large_string
["Amsterdam"]
-- child 5 type: large_string
["San Francisco"]
-- child 1 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
-- is_valid: all not null
-- child 0 type: int64
[135]
-- child 1 type: int64
[4]
-- child 2 type: int64
[0]
-- child 3 type: int64
[null]
-- child 4 type: double
[37.773972]
-- child 5 type: double
[53.11254]
-- child 2 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
-- is_valid: all not null
-- child 0 type: int64
[135]
-- child 1 type: int64
[4]
-- child 2 type: int64
[0]
-- child 3 type: int64
[null]
-- child 4 type: double
[-122.431297]
-- child 5 type: double
[6.0989]]
信息
内容指数据文件存储的内容类型:0
表示数据
,1
表示位置删除
,2
表示等值删除
若仅查看当前快照中的数据文件或删除文件,请分别使用table.inspect.data_files()
和table.inspect.delete_files()
方法。
添加文件
高级 Iceberg 用户可以选择将现有的 Parquet 文件作为数据文件提交到 Iceberg 表中,而无需重写这些文件。
# Given that these parquet files have schema consistent with the Iceberg table
file_paths = [
"s3a://warehouse/default/existing-1.parquet",
"s3a://warehouse/default/existing-2.parquet",
]
# They can be added to the table without rewriting them
tbl.add_files(file_paths=file_paths)
# A new snapshot is committed to the table with manifests pointing to the existing parquet files
名称映射
由于add_files
直接使用现有文件而不生成新的感知Iceberg模式的Parquet文件,因此要求Iceberg表必须配置名称映射(该映射将Parquet文件中的字段名与Iceberg字段ID对应)。因此,add_files
要求Parquet文件的元数据中不能包含字段ID,若表未配置名称映射,则会基于当前表模式自动创建新的名称映射。
分区处理
add_files
仅需通过读取现有Parquet文件的元数据页脚来推断每个文件的分区值。该实现还支持将文件添加到采用MonthTransform
、TruncateTransform
等分区转换的Iceberg表(任何preserves_order
属性设为True的转换都受支持)。请注意,若PartitionField
源列的统计信息未存在于Parquet元数据中,分区值将被推断为None
。
维护操作影响
由于add_files
将现有Parquet文件作为普通数据文件提交至Iceberg表,因此过期快照等破坏性维护操作会移除这些文件。
模式演进
PyIceberg 通过 Python API 提供了完整的模式演进支持。它会自动处理字段 ID 的设置,并确保只进行非破坏性变更(可手动覆盖此限制)。
在以下示例中,.update_schema()
方法直接从表对象调用。
with table.update_schema() as update:
update.add_column("some_field", IntegerType(), "doc")
如果您需要进行的更改不仅限于模式演进,还可以启动一个事务:
with table.transaction() as transaction:
with transaction.update_schema() as update_schema:
update.add_column("some_other_field", IntegerType(), "doc")
# ... Update properties etc
按名称合并
通过使用 .union_by_name()
方法,您可以将另一个模式合并到现有模式中,而无需担心字段ID的问题。
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType, LongType
catalog = load_catalog()
schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
)
table = catalog.create_table("default.locations", schema)
new_schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
NestedField(10, "population", LongType(), required=False),
)
with table.update_schema() as update:
update.union_by_name(new_schema)
现在该表已合并了两个模式print(table.schema())
:
table {
1: city: optional string
2: lat: optional double
3: long: optional double
4: population: optional long
}
添加列
使用 add_column
可以轻松添加列,无需担心字段ID问题。
with table.update_schema() as update:
update.add_column("retries", IntegerType(), "Number of retries to place the bid")
# In a struct
update.add_column("details", StructType())
with table.update_schema() as update:
update.add_column(("details", "confirmed_by"), StringType(), "Name of the exchange")
在向复合类型添加列之前,该复合类型必须已存在。复合类型中的字段以元组形式添加。
重命名列
在Iceberg表中重命名字段非常简单:
with table.update_schema() as update:
update.rename_column("retries", "num_retries")
# This will rename `confirmed_by` to `processed_by` in the `details` struct
update.rename_column(("details", "confirmed_by"), "processed_by")
移动列
调整字段顺序:
with table.update_schema() as update:
update.move_first("symbol")
# This will move `bid` after `ask`
update.move_after("bid", "ask")
# This will move `confirmed_by` before `exchange` in the `details` struct
update.move_before(("details", "confirmed_by"), ("details", "exchange"))
更新列
更新字段的类型、描述或必填属性。
with table.update_schema() as update:
# Promote a float to a double
update.update_column("bid", field_type=DoubleType())
# Make a field optional
update.update_column("symbol", required=False)
# Update the documentation
update.update_column("symbol", doc="Name of the share on the exchange")
请注意,某些操作并不兼容,但您仍可通过设置 allow_incompatible_changes
自行承担风险来执行。
with table.update_schema(allow_incompatible_changes=True) as update:
# Incompatible change, cannot require an optional field
update.update_column("symbol", required=True)
删除列
删除一个字段,注意这是不兼容的变更(读取器/写入器可能会依赖该字段):
with table.update_schema(allow_incompatible_changes=True) as update:
update.delete_column("some_field")
# In a struct
update.delete_column(("details", "confirmed_by"))
分区演进
PyIceberg 支持分区演进功能。更多细节请参阅分区演进规范。
在演进分区时,需要使用表上的 update_spec
API。
with table.update_spec() as update:
update.add_field("id", BucketTransform(16), "bucketed_id")
update.add_field("event_ts", DayTransform(), "day_ts")
更新分区规格也可以作为与其他操作一起的事务的一部分来完成。
with table.transaction() as transaction:
with transaction.update_spec() as update_spec:
update_spec.add_field("id", BucketTransform(16), "bucketed_id")
update_spec.add_field("event_ts", DayTransform(), "day_ts")
# ... Update properties etc
添加字段
可以通过 add_field
API 添加新的分区字段,该接口接收要分区的字段名称、分区转换函数以及可选的分区名称。如果未指定分区名称,系统会自动生成一个。
with table.update_spec() as update:
update.add_field("id", BucketTransform(16), "bucketed_id")
update.add_field("event_ts", DayTransform(), "day_ts")
# identity is a shortcut API for adding an IdentityTransform
update.identity("some_field")
移除字段
如果某些字段不再适合作为分区依据,也可以通过 remove_field
API 来移除这些分区字段。
with table.update_spec() as update:
# Remove the partition field with the name
update.remove_field("some_partition_name")
重命名字段
可以通过rename_field
API来重命名分区字段。
with table.update_spec() as update:
# Rename the partition field with the name bucketed_id to sharded_id
update.rename_field("bucketed_id", "sharded_id")
表属性
通过Transaction
API设置和移除属性:
with table.transaction() as transaction:
transaction.set_properties(abc="def")
assert table.properties == {"abc": "def"}
with table.transaction() as transaction:
transaction.remove_properties("abc")
assert table.properties == {}
或者,不使用上下文管理器:
table = table.transaction().set_properties(abc="def").commit_transaction()
assert table.properties == {"abc": "def"}
table = table.transaction().remove_properties("abc").commit_transaction()
assert table.properties == {}
快照属性
在使用append
或overwrite
API写入表时,可以选择性地设置快照属性:
tbl.append(df, snapshot_properties={"abc": "def"})
# or
tbl.overwrite(df, snapshot_properties={"abc": "def"})
assert tbl.metadata.snapshots[-1].summary["abc"] == "def"
快照管理
通过 Table
API 管理快照操作:
# To run a specific operation
table.manage_snapshots().create_tag(snapshot_id, "tag123").commit()
# To run multiple operations
table.manage_snapshots()
.create_tag(snapshot_id1, "tag123")
.create_tag(snapshot_id2, "tag456")
.commit()
# Operations are applied on commit.
你也可以使用上下文管理器来进行更多修改:
with table.manage_snapshots() as ms:
ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789")
视图
PyIceberg 支持视图操作。
检查视图是否存在
from pyiceberg.catalog import load_catalog
catalog = load_catalog("default")
catalog.view_exists("default.bar")
表统计信息管理
通过Table
API管理表统计信息的操作:
# To run a specific operation
table.update_statistics().set_statistics(statistics_file=statistics_file).commit()
# To run multiple operations
table.update_statistics()
.set_statistics(statistics_file1)
.remove_statistics(snapshot_id2)
.commit()
# Operations are applied on commit.
你也可以使用上下文管理器来进行更多修改:
with table.update_statistics() as update:
update.set_statistics(statistics_file)
update.remove_statistics(snapshot_id2)
查询数据
要查询表数据,需要进行表扫描操作。表扫描可以接收以下参数:过滤器条件、列名、可选的记录条数限制以及快照ID。
from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual
catalog = load_catalog("default")
table = catalog.load_table("nyc.taxis")
scan = table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
limit=100,
)
# Or filter using a string predicate
scan = table.scan(
row_filter="trip_distance > 10.0",
)
[task.file.file_path for task in scan.plan_files()]
底层 API plan_files
方法返回一组任务,这些任务提供可能包含匹配行的文件:
[
"s3://warehouse/wh/nyc/taxis/data/00003-4-42464649-92dd-41ad-b83b-dea1a2fe4b58-00001.parquet"
]
在这种情况下,引擎需要自行过滤文件本身。以下 to_arrow()
和 to_duckdb()
方法已为您实现了这一功能。
Apache Arrow
要求
需要安装pyarrow
。
使用PyIceberg可以从大表中筛选数据并提取到PyArrow表中:
table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow()
这将返回一个 PyArrow 表:
pyarrow.Table
VendorID: int64
tpep_pickup_datetime: timestamp[us, tz=+00:00]
tpep_dropoff_datetime: timestamp[us, tz=+00:00]
----
VendorID: [[2,1,2,1,1,...,2,2,2,2,2],[2,1,1,1,2,...,1,1,2,1,2],...,[2,2,2,2,2,...,2,6,6,2,2],[2,2,2,2,2,...,2,2,2,2,2]]
tpep_pickup_datetime: [[2021-04-01 00:28:05.000000,...,2021-04-30 23:44:25.000000]]
tpep_dropoff_datetime: [[2021-04-01 00:47:59.000000,...,2021-05-01 00:14:47.000000]]
这将仅拉取可能包含匹配行的文件。
如果更倾向于一次读取一个记录批次,也可以返回一个PyArrow RecordBatchReader:
table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow_batch_reader()
Pandas
需求
使用此功能需要安装pandas
。
PyIceberg 能轻松从海量表中筛选数据并加载到本地的 Pandas 数据框中。该操作仅会获取查询所需的 Parquet 文件并应用过滤条件,从而减少 IO 操作,提升性能并降低成本。
table.scan(
row_filter="trip_distance >= 10.0",
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_pandas()
这将返回一个 Pandas 数据框:
***
VendorID tpep_pickup_datetime tpep_dropoff_datetime
0 2 2021-04-01 00:28:05+00:00 2021-04-01 00:47:59+00:00
1 1 2021-04-01 00:39:01+00:00 2021-04-01 00:57:39+00:00
2 2 2021-04-01 00:14:42+00:00 2021-04-01 00:42:59+00:00
3 1 2021-04-01 00:17:17+00:00 2021-04-01 00:43:38+00:00
4 1 2021-04-01 00:24:04+00:00 2021-04-01 00:56:20+00:00
... ... ... ...
116976 2 2021-04-30 23:56:18+00:00 2021-05-01 00:29:13+00:00
116977 2 2021-04-30 23:07:41+00:00 2021-04-30 23:37:18+00:00
116978 2 2021-04-30 23:38:28+00:00 2021-05-01 00:12:04+00:00
116979 2 2021-04-30 23:33:00+00:00 2021-04-30 23:59:00+00:00
116980 2 2021-04-30 23:44:25+00:00 2021-05-01 00:14:47+00:00
[116981 rows x 3 columns]
建议使用 Pandas 2 或更高版本,因为它将数据存储在 Apache Arrow 后端 中,从而避免了数据拷贝。
DuckDB
要求
需要安装DuckDB。
表扫描结果也可以转换为内存中的DuckDB表:
con = table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_duckdb(table_name="distant_taxi_trips")
我们可以使用游标在DuckDB表上运行查询:
print(
con.execute(
"SELECT tpep_dropoff_datetime - tpep_pickup_datetime AS duration FROM distant_taxi_trips LIMIT 4"
).fetchall()
)
[
(datetime.timedelta(seconds=1194),),
(datetime.timedelta(seconds=1118),),
(datetime.timedelta(seconds=1697),),
(datetime.timedelta(seconds=1581),),
]
Ray
要求
这需要安装Ray。
表扫描也可以转换为Ray数据集:
ray_dataset = table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_ray()
这将返回一个 Ray 数据集:
***
Dataset(
num_blocks=1,
num_rows=1168798,
schema={
VendorID: int64,
tpep_pickup_datetime: timestamp[us, tz=UTC],
tpep_dropoff_datetime: timestamp[us, tz=UTC]
}
)
使用 Ray Dataset API 与数据集交互:
print(ray_dataset.take(2))
[
{
"VendorID": 2,
"tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 23, 50),
"tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 0, 34, 31),
},
{
"VendorID": 2,
"tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 5, 3),
"tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 16, 10, 18),
},
]
Daft
PyIceberg 与 Daft Dataframes 紧密集成(参见:Daft 与 Iceberg 的集成),它在 PyIceberg 表之上提供了一个完整的惰性优化查询引擎接口。
要求
这需要安装 Daft。
可以轻松将表读取到 Daft Dataframe 中:
df = table.to_daft() # equivalent to `daft.read_iceberg(table)`
df = df.where(df["trip_distance"] >= 10.0)
df = df.select("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime")
这将返回一个延迟物化的 Daft Dataframe。打印 df
会显示其结构模式:
╭──────────┬───────────────────────────────┬───────────────────────────────╮
│ VendorID ┆ tpep_pickup_datetime ┆ tpep_dropoff_datetime │
│ --- ┆ --- ┆ --- │
│ Int64 ┆ Timestamp(Microseconds, None) ┆ Timestamp(Microseconds, None) │
╰──────────┴───────────────────────────────┴───────────────────────────────╯
(No data to display: Dataframe not materialized)
我们可以使用 df.show()
执行 Dataframe 来预览查询的前几行数据。
该操作已正确优化,能够充分利用 Iceberg 的特性,如隐藏分区和文件级统计信息,以实现高效读取。
df.show(2)
╭──────────┬───────────────────────────────┬───────────────────────────────╮
│ VendorID ┆ tpep_pickup_datetime ┆ tpep_dropoff_datetime │
│ --- ┆ --- ┆ --- │
│ Int64 ┆ Timestamp(Microseconds, None) ┆ Timestamp(Microseconds, None) │
╞══════════╪═══════════════════════════════╪═══════════════════════════════╡
│ 2 ┆ 2008-12-31T23:23:50.000000 ┆ 2009-01-01T00:34:31.000000 │
├╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2 ┆ 2008-12-31T23:05:03.000000 ┆ 2009-01-01T16:10:18.000000 │
╰──────────┴───────────────────────────────┴───────────────────────────────╯
(Showing first 2 rows)
Polars
PyIceberg 与 Polars 的 Dataframe 和 LazyFrame 紧密集成,在 PyIceberg 表之上提供了完整的惰性优化查询引擎接口。
要求
需要安装 polars
。
pip install pyiceberg['polars']
可以通过 DataFrame 或 LazyFrame 在 Polars 中分析和访问 PyIceberg 数据。如果您的代码使用 Apache Iceberg 数据扫描和检索 API,然后在 Polars 中分析结果 DataFrame,请使用 table.scan().to_polars()
API。如果目的是利用 Polars 的高性能过滤和检索功能,请使用从 Iceberg 表导出的 LazyFrame 并配合 table.to_polars()
API。
# Get LazyFrame
iceberg_table.to_polars()
# Get Data Frame
iceberg_table.scan().to_polars()
使用 Polars DataFrame 处理数据
PyIceberg 可以轻松地从海量表中筛选数据,并将其加载到本地的 Polars 数据框中。该操作仅会获取查询所需的 Parquet 文件并应用过滤条件,从而减少 I/O 操作,提升性能并降低成本。
schema = Schema(
NestedField(field_id=1, name='ticket_id', field_type=LongType(), required=True),
NestedField(field_id=2, name='customer_id', field_type=LongType(), required=True),
NestedField(field_id=3, name='issue', field_type=StringType(), required=False),
NestedField(field_id=4, name='created_at', field_type=TimestampType(), required=True),
required=True
)
iceberg_table = catalog.create_table(
identifier='default.product_support_issues',
schema=schema
)
pa_table_data = pa.Table.from_pylist(
[
{'ticket_id': 1, 'customer_id': 546, 'issue': 'User Login issue', 'created_at': 1650020000000000},
{'ticket_id': 2, 'customer_id': 547, 'issue': 'Payment not going through', 'created_at': 1650028640000000},
{'ticket_id': 3, 'customer_id': 548, 'issue': 'Error on checkout', 'created_at': 1650037280000000},
{'ticket_id': 4, 'customer_id': 549, 'issue': 'Unable to reset password', 'created_at': 1650045920000000},
{'ticket_id': 5, 'customer_id': 550, 'issue': 'Account locked', 'created_at': 1650054560000000},
{'ticket_id': 6, 'customer_id': 551, 'issue': 'Order not received', 'created_at': 1650063200000000},
{'ticket_id': 7, 'customer_id': 552, 'issue': 'Refund not processed', 'created_at': 1650071840000000},
{'ticket_id': 8, 'customer_id': 553, 'issue': 'Shipping address issue', 'created_at': 1650080480000000},
{'ticket_id': 9, 'customer_id': 554, 'issue': 'Product damaged', 'created_at': 1650089120000000},
{'ticket_id': 10, 'customer_id': 555, 'issue': 'Unable to apply discount code', 'created_at': 1650097760000000},
{'ticket_id': 11, 'customer_id': 556, 'issue': 'Website not loading', 'created_at': 1650106400000000},
{'ticket_id': 12, 'customer_id': 557, 'issue': 'Incorrect order received', 'created_at': 1650115040000000},
{'ticket_id': 13, 'customer_id': 558, 'issue': 'Unable to track order', 'created_at': 1650123680000000},
{'ticket_id': 14, 'customer_id': 559, 'issue': 'Order delayed', 'created_at': 1650132320000000},
{'ticket_id': 15, 'customer_id': 560, 'issue': 'Product not as described', 'created_at': 1650140960000000},
{'ticket_id': 16, 'customer_id': 561, 'issue': 'Unable to contact support', 'created_at': 1650149600000000},
{'ticket_id': 17, 'customer_id': 562, 'issue': 'Duplicate charge', 'created_at': 1650158240000000},
{'ticket_id': 18, 'customer_id': 563, 'issue': 'Unable to update profile', 'created_at': 1650166880000000},
{'ticket_id': 19, 'customer_id': 564, 'issue': 'App crashing', 'created_at': 1650175520000000},
{'ticket_id': 20, 'customer_id': 565, 'issue': 'Unable to download invoice', 'created_at': 1650184160000000},
{'ticket_id': 21, 'customer_id': 566, 'issue': 'Incorrect billing amount', 'created_at': 1650192800000000},
], schema=iceberg_table.schema().as_arrow()
)
iceberg_table.append(
df=pa_table_data
)
table.scan(
row_filter="ticket_id > 10",
).to_polars()
这将返回一个 Polars DataFrame:
shape: (11, 4)
┌───────────┬─────────────┬────────────────────────────┬─────────────────────┐
│ ticket_id ┆ customer_id ┆ issue ┆ created_at │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ str ┆ datetime[μs] │
╞═══════════╪═════════════╪════════════════════════════╪═════════════════════╡
│ 11 ┆ 556 ┆ Website not loading ┆ 2022-04-16 10:53:20 │
│ 12 ┆ 557 ┆ Incorrect order received ┆ 2022-04-16 13:17:20 │
│ 13 ┆ 558 ┆ Unable to track order ┆ 2022-04-16 15:41:20 │
│ 14 ┆ 559 ┆ Order delayed ┆ 2022-04-16 18:05:20 │
│ 15 ┆ 560 ┆ Product not as described ┆ 2022-04-16 20:29:20 │
│ … ┆ … ┆ … ┆ … │
│ 17 ┆ 562 ┆ Duplicate charge ┆ 2022-04-17 01:17:20 │
│ 18 ┆ 563 ┆ Unable to update profile ┆ 2022-04-17 03:41:20 │
│ 19 ┆ 564 ┆ App crashing ┆ 2022-04-17 06:05:20 │
│ 20 ┆ 565 ┆ Unable to download invoice ┆ 2022-04-17 08:29:20 │
│ 21 ┆ 566 ┆ Incorrect billing amount ┆ 2022-04-17 10:53:20 │
└───────────┴─────────────┴────────────────────────────┴─────────────────────┘
使用 Polars LazyFrame 工作
PyIceberg 支持基于 Iceberg 表创建 Polars LazyFrame。
使用上述代码示例:
lf = iceberg_table.to_polars().filter(pl.col("ticket_id") > 10)
print(lf.collect())
上述代码片段返回一个 Polars LazyFrame,并定义了一个将由 Polars 执行的过滤器:
shape: (11, 4)
┌───────────┬─────────────┬────────────────────────────┬─────────────────────┐
│ ticket_id ┆ customer_id ┆ issue ┆ created_at │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ str ┆ datetime[μs] │
╞═══════════╪═════════════╪════════════════════════════╪═════════════════════╡
│ 11 ┆ 556 ┆ Website not loading ┆ 2022-04-16 10:53:20 │
│ 12 ┆ 557 ┆ Incorrect order received ┆ 2022-04-16 13:17:20 │
│ 13 ┆ 558 ┆ Unable to track order ┆ 2022-04-16 15:41:20 │
│ 14 ┆ 559 ┆ Order delayed ┆ 2022-04-16 18:05:20 │
│ 15 ┆ 560 ┆ Product not as described ┆ 2022-04-16 20:29:20 │
│ … ┆ … ┆ … ┆ … │
│ 17 ┆ 562 ┆ Duplicate charge ┆ 2022-04-17 01:17:20 │
│ 18 ┆ 563 ┆ Unable to update profile ┆ 2022-04-17 03:41:20 │
│ 19 ┆ 564 ┆ App crashing ┆ 2022-04-17 06:05:20 │
│ 20 ┆ 565 ┆ Unable to download invoice ┆ 2022-04-17 08:29:20 │
│ 21 ┆ 566 ┆ Incorrect billing amount ┆ 2022-04-17 10:53:20 │
└───────────┴─────────────┴────────────────────────────┴─────────────────────┘
2025-05-28(三)