PyIceberg

在这里插入图片描述

本文翻译整理自:https://py.iceberg.apache.org/


PyIceberg 入门指南

PyIceberg 是一个用于访问 Iceberg 表的 Python 实现,无需依赖 JVM 环境。


相关链接资源


安装

在安装 PyIceberg 之前,请确保您使用的是最新版本的 pip

pip install --upgrade pip

你可以从 PyPI 安装最新发布版本:

pip install "pyiceberg[s3fs,hive]"

您可以根据需求混合搭配可选依赖项:

键值描述
hive支持Hive元存储
hive-kerberos支持Kerberos环境下的Hive元存储
glue支持AWS Glue
dynamodb支持AWS DynamoDB
sql-postgres支持Postgresql作为后端的SQL目录
sql-sqlite支持SQLite作为后端的SQL目录
pyarrow使用PyArrow作为FileIO实现与对象存储交互
pandas同时安装PyArrow和Pandas
duckdb同时安装PyArrow和DuckDB
ray同时安装PyArrow、Pandas和Ray
daft安装Daft
polars安装Polars
s3fs使用S3FS作为FileIO实现与对象存储交互
adlfs使用ADLFS作为FileIO实现与对象存储交互
snappy支持snappy Avro压缩
gcsfs使用GCSFS作为FileIO实现与对象存储交互
rest-sigv4支持为REST目录生成AWS SIGv4认证头

您需要安装s3fsadlfsgcsfspyarrow才能从对象存储获取文件。


连接到目录

Iceberg利用目录作为组织表的集中管理位置。这可以是传统的Hive目录(用于将Iceberg表与其他表一起存储)、供应商解决方案(如AWS Glue目录),或是Iceberg自身REST协议的实现。请查看配置页面获取所有配置细节。

出于演示目的,我们将配置目录使用SqlCatalog实现,该实现会将信息存储在本地sqlite数据库中。同时我们会将目录配置为在本地文件系统而非对象存储中存储数据文件。由于可扩展性有限,此配置不应在生产环境中使用。

为Iceberg创建临时存储位置:

mkdir /tmp/warehouse

打开 Python 3 REPL 来设置目录:

from pyiceberg.catalog import load_catalog

warehouse_path = "/tmp/warehouse"
catalog = load_catalog(
    "default",
    **{
        'type': 'sql',
        "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
        "warehouse": f"file://{warehouse_path}",
    },
)

sql 目录适用于本地测试,无需额外服务。如需尝试其他目录,请参阅配置说明


写入PyArrow数据框

我们以出租车数据集为例,将其写入Iceberg表。

首先下载一个月的数据:

curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -o /tmp/yellow_tripdata_2023-01.parquet

将其加载到您的 PyArrow 数据框中:

import pyarrow.parquet as pq

df = pq.read_table("/tmp/yellow_tripdata_2023-01.parquet")

创建新的 Iceberg 表:

catalog.create_namespace("default")

table = catalog.create_table(
    "default.taxi_dataset",
    schema=df.schema,
)

将数据框追加到表中:

table.append(df)
len(table.scan().to_arrow())

已向表中写入 3,066,766 行数据。

现在生成用于模型训练的每英里小费特征:

import pyarrow.compute as pc

df = df.append_column("tip_per_mile", pc.divide(df["tip_amount"], df["trip_distance"]))

将表结构演进为包含新列:

with table.update_schema() as update_schema:
    update_schema.union_by_name(df.schema)

现在我们可以将新的数据帧写入 Iceberg 表:

table.overwrite(df)
print(table.scan().to_arrow())

新列已成功添加:

taxi_dataset(
  1: VendorID: optional long,
  2: tpep_pickup_datetime: optional timestamp,
  3: tpep_dropoff_datetime: optional timestamp,
  4: passenger_count: optional double,
  5: trip_distance: optional double,
  6: RatecodeID: optional double,
  7: store_and_fwd_flag: optional string,
  8: PULocationID: optional long,
  9: DOLocationID: optional long,
  10: payment_type: optional long,
  11: fare_amount: optional double,
  12: extra: optional double,
  13: mta_tax: optional double,
  14: tip_amount: optional double,
  15: tolls_amount: optional double,
  16: improvement_surcharge: optional double,
  17: total_amount: optional double,
  18: congestion_surcharge: optional double,
  19: airport_fee: optional double,
  20: tip_per_mile: optional double
),

我们可以看到,有2,371,784行数据具有每英里小费记录:

df = table.scan(row_filter="tip_per_mile > 0").to_arrow()
len(df)

探索Iceberg数据与元数据文件

由于目录配置为使用本地文件系统,我们可以查看Iceberg如何存储上述操作产生的数据和元数据文件。


find /tmp/warehouse/

更多详情

具体细节请查看 CLIPython API 页面。


配置


配置值设置

有三种方式传入配置:

  • 使用 .pyiceberg.yaml 配置文件(推荐方式)
  • 通过环境变量传递
  • 通过CLI或Python API直接传入凭证

配置文件可以存储在以下任一位置(按优先级排序):
1、PYICEBERG_HOME 环境变量指定的目录
2、用户主目录
3、当前工作目录

如需修改 .pyiceberg.yaml 的搜索路径,可以覆盖 PYICEBERG_HOME 环境变量。

另一种方式是通过环境变量进行配置:

export PYICEBERG_CATALOG__DEFAULT__URI=thrift://localhost:9083
export PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID=username
export PYICEBERG_CATALOG__DEFAULT__S3__SECRET_ACCESS_KEY=password

Iceberg 读取的环境变量以 PYICEBERG_ 开头,后接以下 yaml 结构,其中双下划线 __ 表示嵌套字段,而下划线 _ 会被转换为连字符 -

例如,PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID 会在 default 目录下设置 s3.access-key-id


表配置

Iceberg 表支持通过表属性来配置表的行为。


写入选项

选项默认值描述
write.parquet.compression-codec{uncompressed,zstd,gzip,snappy}zstd设置Parquet压缩编解码器。
write.parquet.compression-level整数nullParquet编解码器的压缩级别。若未设置,则由PyIceberg决定
write.parquet.row-group-limit行数1048576单个行组内条目数量的上限
write.parquet.page-size-bytes字节大小1MB为列块内数据页的近似编码大小设置目标阈值
write.parquet.page-row-limit行数20000为列块内最大行数设置目标阈值
write.parquet.dict-size-bytes字节大小2MB设置每个行组的字典页大小限制
write.metadata.previous-versions-max整数100提交后删除前保留的旧版本元数据文件最大数量。
write.metadata.delete-after-commit.enabled布尔值False是否在每次表提交后自动删除旧的已跟踪元数据文件。将保留最近的部分元数据文件,数量可通过属性write.metadata.previous-versions-max设置。
write.object-storage.enabled布尔值False启用ObjectStoreLocationProvider,为文件路径添加哈希组件。
write.object-storage.partitioned-paths布尔值True当启用对象存储时,控制分区值是否包含在文件路径中
write.py-location-provider.impl格式为module.ClassName的字符串null可选项,自定义LocationProvider实现
write.data.path指向位置的字符串{metadata.location}/data设置数据写入的位置。
write.metadata.path指向位置的字符串{metadata.location}/metadata设置元数据写入的位置。

表行为选项

键名选项默认值描述
commit.manifest.target-size-bytes字节大小8388608 (8MB)合并清单文件时的目标大小
commit.manifest.min-count-to-merge清单数量100合并清单文件时的最小数量阈值
commit.manifest-merge.enabled布尔值False控制写入时是否自动合并清单文件

快速追加模式

与Java实现不同,PyIceberg默认采用快速追加模式,因此commit.manifest-merge.enabled默认设置为False


FileIO

Iceberg采用FileIO这一可插拔模块的概念来实现文件的读取、写入和删除操作。默认情况下,PyIceberg会根据文件路径协议(如s3://gs://等)自动尝试初始化匹配的FileIO实现,并优先使用已安装的第一个可用模块。支持以下协议及对应实现:

  • s3, s3a, s3n: PyArrowFileIO, FsspecFileIO
  • gs: PyArrowFileIO
  • file: PyArrowFileIO
  • hdfs: PyArrowFileIO
  • abfs, abfss: FsspecFileIO
  • oss: PyArrowFileIO

用户也可显式指定FileIO实现:

键名示例说明
py-io-implpyiceberg.io.fsspec.FsspecFileIO强制指定FileIO实现类,若无法加载则会明确报错

FileIO模块支持以下配置选项:

S3

键名示例描述
s3.endpointhttps://10.0.19.25/为FileIO配置访问S3服务的替代端点。可用于使S3FileIO兼容任何具有不同端点的S3兼容对象存储服务,或访问虚拟私有云中的私有S3端点。
s3.access-key-idadmin配置用于访问FileIO的静态访问密钥ID。
s3.secret-access-keypassword配置用于访问FileIO的静态秘密访问密钥。
s3.session-tokenAQoDYXdzEJr…配置用于访问FileIO的静态会话令牌。
s3.role-session-namesession为假定角色会话配置的可选标识符。
s3.role-arnarn:aws:…AWS角色ARN。如果提供此参数而非access_key和secret_key,将通过担任此角色获取临时凭证。
s3.signerbearer配置FileIO的签名版本。
s3.signer.urihttp://my.signer:8080/s3当远程签名URI与目录URI不同时进行配置。远程签名仅对FsspecFileIO实现。最终请求将发送至<s3.signer.uri>/<s3.signer.endpoint>
s3.signer.endpointv1/main/s3-sign配置远程签名端点。远程签名仅对FsspecFileIO实现。最终请求将发送至<s3.signer.uri>/<s3.signer.endpoint>(默认值:v1/aws/s3/sign)。
s3.regionus-west-2配置用于初始化S3FileSystem的默认区域。若未设置,PyArrowFileIO会尝试自动解析区域(仅支持AWS S3存储桶)。
s3.resolve-regionFalse仅支持PyArrowFileIO,启用时将始终尝试解析存储桶位置(仅支持AWS S3存储桶)。
s3.proxy-urihttp://my.proxy.com:8080配置FileIO使用的代理服务器。
s3.connect-timeout60.0配置套接字连接超时时间(单位:秒)。
s3.request-timeout60.0在Windows和macOS上配置套接字读取超时时间(单位:秒)。
s3.force-virtual-addressingFalse是否强制使用存储桶虚拟寻址。若为true,则始终启用虚拟寻址;若为false,则仅当endpoint_override为空时启用。适用于仅支持虚拟托管式访问的非AWS后端服务。

HDFS

键名示例描述
hdfs.hosthttps://10.0.19.25/配置要连接的HDFS主机地址
hdfs.port9000配置要连接的HDFS端口号
hdfs.useruser配置用于连接的HDFS用户名
hdfs.kerberos_ticketkerberos_ticket配置Kerberos票据缓存路径

Azure Data Lake

示例描述
adls.connection-stringAccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqF…;BlobEndpoint=http://localhost/连接字符串。可用于通过FileIO连接任何兼容ADLS的对象存储服务(如azurite),这些服务可能使用不同的端点。
adls.account-namedevstoreaccount1要连接的账户名称
adls.account-keyEby8vdM02xNOcqF…用于账户认证的密钥
adls.sas-tokenNuHOuuzdQN7VRM%2FOpOeqBlawRCA845IY05h9eu1Yte4%3D共享访问签名
adls.tenant-idad667be4-b811-11ed-afa1-0242ac120002租户ID
adls.client-idad667be4-b811-11ed-afa1-0242ac120002客户端ID
adls.client-secretoCA3R6P*ka#oa1Sms2J74z…客户端密钥

Google Cloud Storage

键名示例描述
gcs.project-idmy-gcp-project为GCS FileIO配置Google Cloud项目ID。
gcs.oauth2.tokenya29.dr.AfM…用于临时访问的令牌字符串。
gcs.oauth2.token-expires-at1690971805918配置基于访问令牌生成的凭据过期时间(自纪元起的毫秒数)。
gcs.accessread_only配置客户端访问权限,可选值为’read_only’、‘read_write’或’full_control’。
gcs.consistencymd5配置文件写入时的校验方式,可选值为’none’、‘size’或’md5’。
gcs.cache-timeout60配置对象元数据缓存的过期时间(秒)。
gcs.requester-paysFalse配置是否使用请求方付费模式。
gcs.session-kwargs{}配置传递给aiohttp.ClientSession的参数字典(可包含代理设置等)。
gcs.service.hosthttp://0.0.0.0:4443配置GCS FileIO的替代访问端点(格式:协议://主机:端口)。未设置时默认使用环境变量"STORAGE_EMULATOR_HOST"的值,若该变量也未设置则使用标准Google端点。
gcs.default-locationUS配置存储桶的默认创建位置(如’US’或’EUROPE-WEST3’)。
gcs.version-awareFalse配置是否支持GCS存储桶的对象版本控制功能。

阿里云对象存储服务(OSS)

PyIceberg使用S3FileSystem类连接OSS存储桶,因为该服务兼容S3 SDK,只要端点采用虚拟托管样式地址即可。

键名示例描述
s3.endpointhttps://s3.oss-your-bucket-region.aliyuncs.com/为FileIO配置OSS服务的访问端点。必须使用示例中给出的S3兼容端点。
s3.access-key-idadmin配置用于访问FileIO的静态访问密钥ID。
s3.secret-access-keypassword配置用于访问FileIO的静态密钥访问密码。
s3.session-tokenAQoDYXdzEJr…配置用于访问FileIO的静态会话令牌。
s3.force-virtual-addressingTrue是否使用存储桶的虚拟地址。必须设置为True,因为OSS只能通过虚拟托管样式地址访问。

PyArrow

键名示例描述
pyarrow.use-large-types-on-readTrue在表扫描时使用大型PyArrow类型,即large_stringlarge_binarylarge_list字段类型。默认值为True。

位置提供器

Apache Iceberg 使用 LocationProvider 的概念来管理表数据文件的路径。在 PyIceberg 中,LocationProvider 模块设计为可插拔式,允许针对特定用例进行定制,并额外确定元数据文件的位置。表的 LocationProvider 可以通过表属性进行指定。

数据文件和元数据文件的位置均可通过配置表属性 write.data.pathwrite.metadata.path 分别进行自定义。

如需更细粒度的控制,可以重写 LocationProvidernew_data_locationnew_metadata_location 方法,以定义生成文件路径的自定义逻辑。详见 加载自定义位置提供器

PyIceberg 默认使用 SimpleLocationProvider 来管理文件路径。


简单位置提供器

SimpleLocationProvider 提供以 {location}/data/ 为前缀的路径,其中 location 来自表元数据。可以通过设置write.data.path 表配置来覆盖此默认行为。

例如,一个非分区表的数据文件可能具有如下路径:

s3://bucket/ns/table/data/0000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet

当表被分区时,给定分区下的文件会被分组到一个子目录中,该分区键和值作为目录名称——这被称为Hive风格的分区路径格式。例如,按字符串列category分区的表可能有一个数据文件,其路径如下:

s3://bucket/ns/table/data/category=orders/0000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet

对象存储位置提供器

PyIceberg 提供了 ObjectStoreLocationProvider 以及可选的分区排除优化功能,专为存储在对象存储中的表设计。如需了解这些配置的更多背景和动机,请参阅Iceberg Java实现的文档

当多个文件存储在相同前缀下时,S3等云对象存储通常会对前缀进行请求限流,导致性能下降。ObjectStoreLocationProvider 通过在文件路径中注入二进制目录形式的确定性哈希值来应对这一问题,从而将文件分散到更多对象存储前缀中。

路径以 {location}/data/ 为前缀,其中 location 来自表元数据,其方式类似于 SimpleLocationProvider。这可以通过设置write.data.path表配置来覆盖。

例如,一个按字符串列 category 分区的表可能具有如下位置的数据文件:(注意额外的二进制目录)


s3://bucket/ns/table/data/0101/0110/1001/10110010/category=orders/0000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet

为表启用 ObjectStoreLocationProvider 需要显式将其 write.object-storage.enabled 表属性设置为 True


分区排除

当使用 ObjectStoreLocationProvider 时,表属性 write.object-storage.partitioned-paths(默认为 True)可设置为 False,作为针对对象存储的额外优化。这将完全省略数据文件路径中的分区键和值,以进一步减小键大小。禁用该属性后,上述相同的数据文件将被写入以下路径:(注意缺少 category=orders


s3://bucket/ns/table/data/1101/0100/1011/00111010-00000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet

加载自定义位置提供器

与 FileIO 类似,可以通过具体继承抽象基类 LocationProvider 来为表提供自定义的 LocationProvider

需要将表属性 write.py-location-provider.impl 设置为自定义 LocationProvider 的完全限定名称(例如 mymodule.MyLocationProvider)。请注意,LocationProvider 是按表配置的,允许为不同表提供不同的位置分配方案。还需注意,Iceberg 的 Java 实现使用不同的表属性 write.location-provider.impl 来配置自定义 Java 实现。

下面展示了一个自定义 LocationProvider 的实现示例。


import uuid

class UUIDLocationProvider(LocationProvider):
    def __init__(self, table_location: str, table_properties: Properties):
        super().__init__(table_location, table_properties)

    def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
        # Can use any custom method to generate a file path given the partitioning information and file name
        prefix = f"{self.table_location}/{uuid.uuid4()}"
        return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if partition_key else f"{prefix}/{data_file_name}"

目录

PyIceberg 目前原生支持 REST、SQL、Hive、Glue 和 DynamoDB 类型的目录。此外,您也可以直接设置目录实现:

键名示例描述
typerest目录类型,可选值为 restsqlhivegluedymamodb,默认为 rest
py-catalog-implmypackage.mymodule.MyCatalog显式设置目录实现类,若无法加载则会明确报错

REST Catalog


catalog:
  default:
    uri: http://rest-catalog/ws/
    credential: t-1234:secret

  default-mtls-secured-catalog:
    uri: https://rest-catalog/ws/
    ssl:
      client:
        cert: /absolute/path/to/client.crt
        key: /absolute/path/to/client.key
      cabundle: /absolute/path/to/cabundle.pem

示例描述
urihttps://rest-catalog/ws标识REST服务器的URI
ugit-1234:secretHive客户端使用的Hadoop UGI凭证
credentialt-1234:secret初始化目录时用于OAuth2凭证流的认证信息
tokenFEW23.DFSDF.FSDF用于Authorization头的Bearer令牌值
scopeopenid offline corpds:ds:profile请求安全令牌的期望作用域(默认为catalog)
resourcerest_catalog.iceberg.com目标资源或服务的URI
audiencerest_catalog目标资源或服务的逻辑名称
rest.sigv4-enabledtrue使用AWS SigV4协议对REST服务器请求进行签名
rest.signing-regionus-east-1使用SigV4签名请求时指定的区域
rest.signing-nameexecute-api使用SigV4签名请求时指定的服务签名名称
oauth2-server-urihttps://auth-service/cc客户端凭证认证使用的认证URL(默认为uri + ‘v1/oauth/tokens’)

RESTCatalog中的请求头配置

要在RESTCatalog中配置自定义请求头,请在目录属性中添加以header.为前缀的配置项。这样可以确保所有发送至REST服务的HTTP请求都包含指定的请求头。


catalog:
  default:
    uri: http://rest-catalog/ws/
    credential: t-1234:secret
    header.content-type: application/vnd.api+json

RESTCatalog规范定义的特定请求头包括:

键名选项默认值描述
header.X-Iceberg-Access-Delegation{vended-credentials,remote-signing}vended-credentials向服务器表明客户端支持通过逗号分隔的访问机制列表进行委托访问。服务器可以选择通过任意或完全不通过请求的机制来提供访问权限

SQL Catalog

SQL Catalog需要一个数据库作为其后端。PyIceberg通过psycopg2支持PostgreSQL和SQLite。必须使用uri属性配置数据库连接。init_catalog_tables是可选的,默认值为True。如果设置为False,初始化SQLCatalog时将不会创建目录表。详情请参阅SQLAlchemy的URL格式文档

对于PostgreSQL:

catalog:
  default:
    type: sql
    uri: postgresql+psycopg2://username:password@localhost/mydatabase
    init_catalog_tables: false

对于SQLite的情况:

仅限开发用途

SQLite并非为并发场景设计,建议仅将此目录用于探索性或开发目的。


catalog:
  default:
    type: sql
    uri: sqlite:tmp/pyiceberg.db
    init_catalog_tables: false

键名示例默认值描述
uripostgresql+psycopg2://username:password@localhost/mydatabase目录数据库的SQLAlchemy后端URL(参见URL格式文档
echotruefalseSQLAlchemy引擎的echo参数,用于将所有语句记录到默认日志处理器
pool_pre_pingtruefalseSQLAlchemy引擎的pool_pre_ping参数,用于在每次检出时测试连接活性

内存目录

内存目录基于SqlCatalog构建,并使用SQLite内存数据库作为其后端。

它适用于测试、演示和实验环境,但不适合生产环境,因为不支持并发访问。


catalog:
  default:
    type: in-memory
    warehouse: /tmp/pyiceberg/warehouse

键名示例默认值描述
warehouse/tmp/pyiceberg/warehousefile:///tmp/iceberg/warehouse内存目录将存储其数据文件的目录路径。

Hive Catalog


catalog:
  default:
    uri: thrift://localhost:9083
    s3.endpoint: http://localhost:9000
    s3.access-key-id: admin
    s3.secret-access-key: password

键名示例描述
hive.hive2-compatibletrue启用 Hive 2.x 兼容模式
hive.kerberos-authenticationtrue通过 Kerberos 进行认证

使用 Hive 2.x 时,请确保设置兼容性标志:

catalog:
  default:
...
    hive.hive2-compatible: true

Glue Catalog

您的AWS凭证可以直接通过Python API传递。否则,请参考如何配置AWS凭证在本地设置您的AWS账户凭证。


catalog:
  default:
    type: glue
    glue.access-key-id: <ACCESS_KEY_ID>
    glue.secret-access-key: <SECRET_ACCESS_KEY>
    glue.session-token: <SESSION_TOKEN>
    glue.region: <REGION_NAME>
    s3.endpoint: http://localhost:9000
    s3.access-key-id: admin
    s3.secret-access-key: password
catalog:
  default:
    type: glue
    glue.profile-name: <PROFILE_NAME>
    glue.region: <REGION_NAME>
    s3.endpoint: http://localhost:9000
    s3.access-key-id: admin
    s3.secret-access-key: password

客户端专属属性

glue.* 属性仅适用于 Glue Catalog。如需在 Glue Catalog 和 S3 FileIO 中使用相同的凭证,可设置 client.* 属性。详见统一AWS凭证章节。

键名示例值描述
glue.id111111111111配置Glue Catalog的12位数字ID
glue.skip-archivetrue配置是否跳过旧表版本的归档。默认为true
glue.endpointhttps://glue.us-east-1.amazonaws.com配置GlueCatalog访问Glue服务的备用端点
glue.profile-namedefault配置访问Glue Catalog使用的静态profile
glue.regionus-east-1设置Glue Catalog的区域
glue.access-key-idadmin配置访问Glue Catalog使用的静态访问密钥ID
glue.secret-access-keypassword配置访问Glue Catalog使用的静态密钥
glue.session-tokenAQoDYXdzEJr…配置访问Glue Catalog使用的静态会话令牌
glue.max-retries10配置Glue服务调用的最大重试次数
glue.retry-modestandard配置Glue服务的重试模式。默认为standard

已移除属性

profile_nameregion_nameaws_access_key_idaws_secret_access_keyaws_session_token 属性已在0.8.0版本弃用并移除


DynamoDB Catalog

若需使用AWS DynamoDB作为元数据目录,可通过以下两种方式配置pyiceberg,并参考如何配置AWS凭证在本地设置AWS账户凭证。如需为DynamoDB Catalog和S3 FileIO使用相同凭证,可设置client.*属性


catalog:
  default:
    type: dynamodb
    table-name: iceberg

如果您更倾向于显式传递凭据给客户端,而不是依赖环境变量,


catalog:
  default:
    type: dynamodb
    table-name: iceberg
    dynamodb.access-key-id: <ACCESS_KEY_ID>
    dynamodb.secret-access-key: <SECRET_ACCESS_KEY>
    dynamodb.session-token: <SESSION_TOKEN>
    dynamodb.region: <REGION_NAME>
    s3.endpoint: http://localhost:9000
    s3.access-key-id: admin
    s3.secret-access-key: password

客户端专属属性

dynamodb.* 属性仅适用于 DynamoDB Catalog。如需在 DynamoDB Catalog 和 S3 FileIO 中使用相同凭证,可设置 client.* 属性。详见统一AWS凭证章节。

键名示例描述
dynamodb.profile-namedefault配置用于访问 DynamoDB Catalog 的静态凭证配置文件
dynamodb.regionus-east-1设置 DynamoDB Catalog 所在区域
dynamodb.access-key-idadmin配置用于访问 DynamoDB Catalog 的静态访问密钥ID
dynamodb.secret-access-keypassword配置用于访问 DynamoDB Catalog 的静态密钥访问密码
dynamodb.session-tokenAQoDYXdzEJr…配置用于访问 DynamoDB Catalog 的静态会话令牌

已移除属性

profile_nameregion_nameaws_access_key_idaws_secret_access_keyaws_session_token 属性已在 0.8.0 版本弃用并移除


自定义目录实现

如需加载任何自定义目录实现,可按如下方式设置目录配置:

catalog:
  default:
    py-catalog-impl: mypackage.mymodule.MyCatalog
    custom-key1: value1
    custom-key2: value2

统一AWS凭证配置

您可以通过配置client.*属性来显式设置Glue/DynamoDB Catalog和S3 FileIO的AWS凭证。例如:

catalog:
  default:
    type: glue
    client.access-key-id: <ACCESS_KEY_ID>
    client.secret-access-key: <SECRET_ACCESS_KEY>
    client.region: <REGION_NAME>

为Glue Catalog和S3 FileIO配置AWS凭证。

键名示例描述
client.regionus-east-1设置Glue/DynamoDB Catalog和S3 FileIO的区域
client.access-key-idadmin配置用于访问Glue/DynamoDB Catalog和S3 FileIO的静态访问密钥ID
client.secret-access-keypassword配置用于访问Glue/DynamoDB Catalog和S3 FileIO的静态秘密访问密钥
client.session-tokenAQoDYXdzEJr…配置用于访问Glue/DynamoDB Catalog和S3 FileIO的静态会话令牌
client.role-session-namesession可选参数,用于标识所承担角色的会话名称
client.role-arnarn:aws:…AWS角色ARN。如果提供此参数而非access_key和secret_key,将通过承担此角色来获取临时凭证

属性优先级

若设置了服务特定属性,client.*属性将被覆盖。例如,若client.region设为us-west-1s3.region设为us-east-1,则S3 FileIO将使用us-east-1作为区域。


并发控制

PyIceberg 采用多线程机制来并行化操作。您可以通过以下两种方式配置工作线程数量:
1、在配置文件中设置 max-workers 参数
2、通过环境变量 PYICEBERG_MAX_WORKERS 指定

默认线程数会根据系统硬件和 Python 版本自动确定,具体细节可参考 Python 官方文档


向后兼容性

旧版Java实现(<1.4.0)错误地将current-snapshot-id这个可选属性假定为TableMetadata中的必填属性。这意味着如果元数据文件中缺少current-snapshot-id(例如建表时),应用程序会因无法加载表而抛出异常。该问题在较新的Iceberg版本中已修正。但用户仍可通过配置强制PyIceberg生成兼容旧版规范的元数据文件,只需在配置文件中将legacy-current-snapshot-id属性设为"True",或设置环境变量PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID。更多技术细节请参阅PR讨论


纳秒级支持

PyIceberg 当前在 TimestampType 中仅支持微秒级精度。PyArrow 时间戳类型若为 ‘s’ 或 ‘ms’ 精度,在写入时会自动向上转换为 ‘us’ 精度时间戳。如果需要,‘ns’ 精度的时间戳在写入时也可自动向下转换。可通过在配置文件中设置 downcast-ns-timestamp-to-us-on-write 属性为 “True”,或设置环境变量 PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE 来启用此功能。关于纳秒级支持的长期规划详情,请参阅纳秒时间戳提案文档


Python 命令行工具

PyIceberg 提供了一个 CLI 工具,安装 pyiceberg 包后即可使用。

虽然可以通过 --uri--credential 参数传递 Catalog 路径,但建议按照 Catalog 章节所述配置 ~/.pyiceberg.yaml 文件。


➜  pyiceberg --help
Usage: pyiceberg [OPTIONS] COMMAND [ARGS]...

Options:
--catalog TEXT
--verbose BOOLEAN
--output [text|json]
--ugi TEXT
--uri TEXT
--credential TEXT
--help                Show this message and exit.

Commands:
describe    Describes a namespace xor table
drop        Operations to drop a namespace or table
list        Lists tables or namespaces
location    Returns the location of the table
properties  Properties on tables/namespaces
rename      Renames a table
schema      Gets the schema of the table
spec        Returns the partition spec of the table
uuid        Returns the UUID of the table

这个示例假设您已设置了默认目录。如果想加载其他目录(例如上文中的 rest 示例),则需要设置 --catalog rest


➜  pyiceberg list
default
nyc
➜  pyiceberg list nyc
nyc.taxis
➜  pyiceberg describe nyc.taxis
Table format version  1
Metadata location     file:/.../nyc.db/taxis/metadata/00000-aa3a3eac-ea08-4255-b890-383a64a94e42.metadata.json
Table UUID            6cdfda33-bfa3-48a7-a09e-7abb462e3460
Last Updated          1661783158061
Partition spec        []
Sort order            []
Current schema        Schema, id=0
├── 1: VendorID: optional long
├── 2: tpep_pickup_datetime: optional timestamptz
├── 3: tpep_dropoff_datetime: optional timestamptz
├── 4: passenger_count: optional double
├── 5: trip_distance: optional double
├── 6: RatecodeID: optional double
├── 7: store_and_fwd_flag: optional string
├── 8: PULocationID: optional long
├── 9: DOLocationID: optional long
├── 10: payment_type: optional long
├── 11: fare_amount: optional double
├── 12: extra: optional double
├── 13: mta_tax: optional double
├── 14: tip_amount: optional double
├── 15: tolls_amount: optional double
├── 16: improvement_surcharge: optional double
├── 17: total_amount: optional double
├── 18: congestion_surcharge: optional double
└── 19: airport_fee: optional double
Current snapshot      Operation.APPEND: id=5937117119577207079, schema_id=0
Snapshots             Snapshots
└── Snapshot 5937117119577207079, schema 0: file:/.../nyc.db/taxis/metadata/snap-5937117119577207079-1-94656c4f-4c66-4600-a4ca-f30377300527.avro
Properties            owner                 root
write.format.default  parquet

或者输出为JSON格式以便自动化处理:

➜  pyiceberg --output json describe nyc.taxis | jq
{
  "identifier": [
    "nyc",
    "taxis"
  ],
  "metadata_location": "file:/.../nyc.db/taxis/metadata/00000-aa3a3eac-ea08-4255-b890-383a64a94e42.metadata.json",
  "metadata": {
    "location": "file:/.../nyc.db/taxis",
    "table-uuid": "6cdfda33-bfa3-48a7-a09e-7abb462e3460",
    "last-updated-ms": 1661783158061,
    "last-column-id": 19,
    "schemas": [
      {
        "type": "struct",
        "fields": [
          {
            "id": 1,
            "name": "VendorID",
            "type": "long",
            "required": false
          },
...
          {
            "id": 19,
            "name": "airport_fee",
            "type": "double",
            "required": false
          }
        ],
        "schema-id": 0,
        "identifier-field-ids": []
      }
    ],
    "current-schema-id": 0,
    "partition-specs": [
      {
        "spec-id": 0,
        "fields": []
      }
    ],
    "default-spec-id": 0,
    "last-partition-id": 999,
    "properties": {
      "owner": "root",
      "write.format.default": "parquet"
    },
    "current-snapshot-id": 5937117119577207000,
    "snapshots": [
      {
        "snapshot-id": 5937117119577207000,
        "timestamp-ms": 1661783158061,
        "manifest-list": "file:/.../nyc.db/taxis/metadata/snap-5937117119577207079-1-94656c4f-4c66-4600-a4ca-f30377300527.avro",
        "summary": {
          "operation": "append",
          "spark.app.id": "local-1661783139151",
          "added-data-files": "1",
          "added-records": "2979431",
          "added-files-size": "46600777",
          "changed-partition-count": "1",
          "total-records": "2979431",
          "total-files-size": "46600777",
          "total-data-files": "1",
          "total-delete-files": "0",
          "total-position-deletes": "0",
          "total-equality-deletes": "0"
        },
        "schema-id": 0
      }
    ],
    "snapshot-log": [
      {
        "snapshot-id": "5937117119577207079",
        "timestamp-ms": 1661783158061
      }
    ],
    "metadata-log": [],
    "sort-orders": [
      {
        "order-id": 0,
        "fields": []
      }
    ],
    "default-sort-order-id": 0,
    "refs": {
      "main": {
        "snapshot-id": 5937117119577207000,
        "type": "branch"
      }
    },
    "format-version": 1,
    "schema": {
      "type": "struct",
      "fields": [
        {
          "id": 1,
          "name": "VendorID",
          "type": "long",
          "required": false
        },
...
        {
          "id": 19,
          "name": "airport_fee",
          "type": "double",
          "required": false
        }
      ],
      "schema-id": 0,
      "identifier-field-ids": []
    },
    "partition-spec": []
  }
}

Python API

PyIceberg 围绕目录(catalog)机制来加载表。第一步是实例化一个用于加载表的目录。我们使用以下配置来定义一个名为 prod 的目录:

catalog:
  prod:
    uri: http://rest-catalog/ws/
    credential: t-1234:secret

请注意,可以在同一个 .pyiceberg.yaml 文件中定义多个目录。


catalog:
  hive:
    uri: thrift://127.0.0.1:9083
    s3.endpoint: http://127.0.0.1:9000
    s3.access-key-id: admin
    s3.secret-access-key: password
  rest:
    uri: https://rest-server:8181/
    warehouse: my-warehouse

通过调用 load_catalog(name="hive")load_catalog(name="rest") 在 Python 中加载。

这些配置信息必须放置在名为 .pyiceberg.yaml 的文件中,该文件可以位于以下目录之一:

  • $HOME%USERPROFILE% 目录(根据操作系统是 Unix 类或 Windows 类而定)
  • 当前工作目录
  • $PYICEBERG_HOME 目录(如果设置了对应的环境变量)

有关配置选项的更多细节,请参阅专用页面

然后加载名为 prod 的 catalog:

from pyiceberg.catalog import load_catalog

catalog = load_catalog(
    "docs",
    **{
        "uri": "http://127.0.0.1:8181",
        "s3.endpoint": "http://127.0.0.1:9000",
        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
    }
)

让我们创建一个命名空间:

catalog.create_namespace("docs_example")

然后列出它们:

ns = catalog.list_namespaces()

assert ns == [("docs_example",)]

然后列出命名空间中的表:

catalog.list_tables("docs_example")

创建表

要通过目录创建表:

from pyiceberg.schema import Schema
from pyiceberg.types import (
    TimestampType,
    FloatType,
    DoubleType,
    StringType,
    NestedField,
    StructType,
)

schema = Schema(
    NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
    NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),
    NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),
    NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
    NestedField(
        field_id=5,
        name="details",
        field_type=StructType(
            NestedField(
                field_id=4, name="created_by", field_type=StringType(), required=False
            ),
        ),
        required=False,
    ),
)

from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform

partition_spec = PartitionSpec(
    PartitionField(
        source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
    )
)

from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform

# Sort on the symbol
sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))

catalog.create_table(
    identifier="docs_example.bids",
    schema=schema,
    location="s3://pyiceberg",
    partition_spec=partition_spec,
    sort_order=sort_order,
)

创建表时,模式中的所有ID都会被重新分配以确保唯一性。

要通过pyarrow模式创建表:

import pyarrow as pa

schema = pa.schema(
    [
        pa.field("foo", pa.string(), nullable=True),
        pa.field("bar", pa.int32(), nullable=False),
        pa.field("baz", pa.bool_(), nullable=True),
    ]
)

catalog.create_table(
    identifier="docs_example.bids",
    schema=schema,
)

要以事务方式原子化地创建表并进行后续修改:

with catalog.create_table_transaction(
    identifier="docs_example.bids",
    schema=schema,
    location="s3://pyiceberg",
    partition_spec=partition_spec,
    sort_order=sort_order,
) as txn:
    with txn.update_schema() as update_schema:
        update_schema.add_column(path="new_column", field_type=StringType())

    with txn.update_spec() as update_spec:
        update_spec.add_identity("symbol")

    txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")

加载表


目录表

加载 bids 表:

table = catalog.load_table("docs_example.bids")
# Equivalent to:
table = catalog.load_table(("docs_example", "bids"))
# The tuple syntax can be used if the namespace or table contains a dot.

这将返回一个表示 Iceberg 表的 Table 对象,该表可被查询和修改。


静态表

要直接从元数据文件加载表(即使用目录),可以按如下方式使用 StaticTable

from pyiceberg.table import StaticTable

static_table = StaticTable.from_metadata(
    "s3://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
)

静态表被视为只读。


检查表是否存在

要检查 bids 表是否存在:

catalog.table_exists("docs_example.bids")

如果表已存在,则返回 True


写入支持

从 PyIceberg 0.6.0 版本开始,通过 Arrow 实现了写入支持功能。让我们来看一个 Arrow 表的示例:

import pyarrow as pa

df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
        {"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
        {"city": "Drachten", "lat": 53.11254, "long": 6.0989},
        {"city": "Paris", "lat": 48.864716, "long": 2.349014},
    ],
)

接下来,根据该模式创建表:

from pyiceberg.catalog import load_catalog

catalog = load_catalog("default")

from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
)

tbl = catalog.create_table("default.cities", schema=schema)

现在将数据写入表:

快速追加

PyIceberg 默认采用快速追加方式以最小化写入数据量。这种方式能实现快速写入,降低冲突可能性。快速追加的缺点是会比普通提交产生更多元数据。计划实现压缩功能,当达到阈值时将自动重写所有元数据,以保持读取性能。


tbl.append(df)

# or

tbl.overwrite(df)

数据被写入表中,当使用 tbl.scan().to_arrow() 读取表时:

pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten","Paris"]]
lat: [[52.371807,37.773972,53.11254,48.864716]]
long: [[4.896029,-122.431297,6.0989,2.349014]]

由于目前还没有数据,你们都可以使用 append(df)overwrite(df)。如果想添加更多数据,可以再次使用 .append()

df = pa.Table.from_pylist(
    [{"city": "Groningen", "lat": 53.21917, "long": 6.56667}],
)

tbl.append(df)

读取表 tbl.scan().to_arrow() 时,可以看到 Groningen 现在也包含在表中:

pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten","Paris"],["Groningen"]]
lat: [[52.371807,37.773972,53.11254,48.864716],[53.21917]]
long: [[4.896029,-122.431297,6.0989,2.349014],[6.56667]]

嵌套列表表示不同的 Arrow 缓冲区,其中首次写入会生成一个缓冲区,而第二次追加操作会写入另一个独立缓冲区。这是预期行为,因为系统需要读取两个 Parquet 文件。

为避免写入过程中出现类型错误,您可以使用 Iceberg 表模式来强制指定 PyArrow 表的类型:

from pyiceberg.catalog import load_catalog
import pyarrow as pa

catalog = load_catalog("default")
table = catalog.load_table("default.cities")
schema = table.schema().as_arrow()

df = pa.Table.from_pylist(
    [{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], schema=schema
)

table.append(df)

您可以通过调用 tbl.delete() 并指定所需的 delete_filter 来删除表中的部分数据。


tbl.delete(delete_filter="city == 'Paris'")

在上面的例子中,所有城市字段值等于Paris的记录都将被删除。运行tbl.scan().to_arrow()现在会返回:

pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]]
lat: [[52.371807,37.773972,53.11254],[53.21917]]
long: [[4.896029,-122.431297,6.0989],[6.56667]]

部分覆盖

使用 overwrite API 时,可以通过 overwrite_filter 先删除表中符合过滤条件的数据,然后再追加新数据到表中。

例如,对于以下方式创建的 Iceberg 表:

from pyiceberg.catalog import load_catalog

catalog = load_catalog("default")

from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
)

tbl = catalog.create_table("default.cities", schema=schema)

随着初始数据填充到表中:

import pyarrow as pa
df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
        {"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
        {"city": "Drachten", "lat": 53.11254, "long": 6.0989},
        {"city": "Paris", "lat": 48.864716, "long": 2.349014},
    ],
)
tbl.append(df)

你可以用 New York 的记录覆盖 Paris 的记录:

from pyiceberg.expressions import EqualTo
df = pa.Table.from_pylist(
    [
        {"city": "New York", "lat": 40.7128, "long": 74.0060},
    ]
)
tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris"))

这将通过 tbl.scan().to_arrow() 产生如下结果:

pyarrow.Table
city: large_string
lat: double
long: double
----
city: [["New York"],["Amsterdam","San Francisco","Drachten"]]
lat: [[40.7128],[52.371807,37.773972,53.11254]]
long: [[74.006],[4.896029,-122.431297,6.0989]]

如果 PyIceberg 表已分区,您可以使用 tbl.dynamic_partition_overwrite(df) 来替换现有分区为数据框中提供的新分区。系统会自动从提供的 Arrow 表中检测需要替换的分区。例如,对于一个在 "city" 字段上指定了分区的 Iceberg 表:

from pyiceberg.schema import Schema
from pyiceberg.types import DoubleType, NestedField, StringType

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
)

tbl = catalog.create_table(
    "default.cities",
    schema=schema,
    partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="city_identity"))
)

我们想要覆盖 "Paris" 分区的数据:

import pyarrow as pa

df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
        {"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
        {"city": "Drachten", "lat": 53.11254, "long": 6.0989},
        {"city": "Paris", "lat": -48.864716, "long": -2.349014},
    ],
)
tbl.append(df)

然后我们可以用这个 arrow 表调用 dynamic_partition_overwrite

df_corrected = pa.Table.from_pylist([
    {"city": "Paris", "lat": 48.864716, "long": 2.349014}
])
tbl.dynamic_partition_overwrite(df_corrected)

这将通过 tbl.scan().to_arrow() 产生以下结果:

pyarrow.Table
city: large_string
lat: double
long: double
----
city: [["Paris"],["Amsterdam"],["Drachten"],["San Francisco"]]
lat: [[48.864716],[52.371807],[53.11254],[37.773972]]
long: [[2.349014],[4.896029],[6.0989],[-122.431297]]

Upsert

PyIceberg 支持 upsert 操作,这意味着它能够将 Arrow 表合并到 Iceberg 表中。系统会根据标识字段来判断行是否相同。如果表中已存在某行数据,则会更新该行;如果找不到匹配的行,则会插入新行。

假设有以下包含数据的表:

from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType, NestedField, StringType

import pyarrow as pa

schema = Schema(
    NestedField(1, "city", StringType(), required=True),
    NestedField(2, "inhabitants", IntegerType(), required=True),
    # Mark City as the identifier field, also known as the primary-key
    identifier_field_ids=[1]
)

tbl = catalog.create_table("default.cities", schema=schema)

arrow_schema = pa.schema(
    [
        pa.field("city", pa.string(), nullable=False),
        pa.field("inhabitants", pa.int32(), nullable=False),
    ]
)

# Write some data
df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "inhabitants": 921402},
        {"city": "San Francisco", "inhabitants": 808988},
        {"city": "Drachten", "inhabitants": 45019},
        {"city": "Paris", "inhabitants": 2103000},
    ],
    schema=arrow_schema
)
tbl.append(df)

接下来,我们将向 Iceberg 表执行 upsert 操作:

df = pa.Table.from_pylist(
    [
        # Will be updated, the inhabitants has been updated
        {"city": "Drachten", "inhabitants": 45505},

        # New row, will be inserted
        {"city": "Berlin", "inhabitants": 3432000},

        # Ignored, already exists in the table
        {"city": "Paris", "inhabitants": 2103000},
    ],
    schema=arrow_schema
)
upd = tbl.upsert(df)

assert upd.rows_updated == 1
assert upd.rows_inserted == 1

PyIceberg 会自动检测哪些行需要更新、插入或可以直接忽略。


检查表结构

可以通过检查表来查看表的元数据信息。


时间旅行功能

要使用时间旅行功能检查表的元数据,请调用inspect table方法并传入snapshot_id参数。该功能支持除snapshotsrefs之外的所有元数据表。


table.inspect.entries(snapshot_id=805611270568163028)

快照

检查表的快照:

table.inspect.snapshots()
pyarrow.Table
committed_at: timestamp[ms] not null
snapshot_id: int64 not null
parent_id: int64
operation: string
manifest_list: string not null
summary: map<string, string>
  child 0, entries: struct<key: string not null, value: string> not null
      child 0, key: string not null
      child 1, value: string
----
committed_at: [[2024-03-15 15:01:25.682,2024-03-15 15:01:25.730,2024-03-15 15:01:25.772]]
snapshot_id: [[805611270568163028,3679426539959220963,5588071473139865870]]
parent_id: [[null,805611270568163028,3679426539959220963]]
operation: [["append","overwrite","append"]]
manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap-805611270568163028-0-43637daf-ea4b-4ceb-b096-a60c25481eb5.avro","s3://warehouse/default/table_metadata_snapshots/metadata/snap-3679426539959220963-0-8be81019-adf1-4bb6-a127-e15217bd50b3.avro","s3://warehouse/default/table_metadata_snapshots/metadata/snap-5588071473139865870-0-1382dd7e-5fbc-4c51-9776-a832d7d0984e.avro"]]
summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]]

分区

查看表的分区情况:

table.inspect.partitions()
pyarrow.Table
partition: struct<dt_month: int32, dt_day: date32[day]> not null
  child 0, dt_month: int32
  child 1, dt_day: date32[day]
spec_id: int32 not null
record_count: int64 not null
file_count: int32 not null
total_data_file_size_in_bytes: int64 not null
position_delete_record_count: int64 not null
position_delete_file_count: int32 not null
equality_delete_record_count: int64 not null
equality_delete_file_count: int32 not null
last_updated_at: timestamp[ms]
last_updated_snapshot_id: int64
----
partition: [
  -- is_valid: all not null
  -- child 0 type: int32
[null,null,612]
  -- child 1 type: date32[day]
[null,2021-02-01,null]]
spec_id: [[2,1,0]]
record_count: [[1,1,2]]
file_count: [[1,1,2]]
total_data_file_size_in_bytes: [[641,641,1260]]
position_delete_record_count: [[0,0,0]]
position_delete_file_count: [[0,0,0]]
equality_delete_record_count: [[0,0,0]]
equality_delete_file_count: [[0,0,0]]
last_updated_at: [[2024-04-13 18:59:35.981,2024-04-13 18:59:35.465,2024-04-13 18:59:35.003]]

条目

用于显示表中当前数据文件和删除文件的所有清单条目。


table.inspect.entries()
pyarrow.Table
status: int8 not null
snapshot_id: int64 not null
sequence_number: int64 not null
file_sequence_number: int64 not null
data_file: struct<content: int8 not null, file_path: string not null, file_format: string not null, partition: struct<> not null, record_count: int64 not null, file_size_in_bytes: int64 not null, column_sizes: map<int32, int64>, value_counts: map<int32, int64>, null_value_counts: map<int32, int64>, nan_value_counts: map<int32, int64>, lower_bounds: map<int32, binary>, upper_bounds: map<int32, binary>, key_metadata: binary, split_offsets: list<item: int64>, equality_ids: list<item: int32>, sort_order_id: int32> not null
  child 0, content: int8 not null
  child 1, file_path: string not null
  child 2, file_format: string not null
  child 3, partition: struct<> not null
  child 4, record_count: int64 not null
  child 5, file_size_in_bytes: int64 not null
  child 6, column_sizes: map<int32, int64>
      child 0, entries: struct<key: int32 not null, value: int64> not null
          child 0, key: int32 not null
          child 1, value: int64
  child 7, value_counts: map<int32, int64>
      child 0, entries: struct<key: int32 not null, value: int64> not null
          child 0, key: int32 not null
          child 1, value: int64
  child 8, null_value_counts: map<int32, int64>
      child 0, entries: struct<key: int32 not null, value: int64> not null
          child 0, key: int32 not null
          child 1, value: int64
  child 9, nan_value_counts: map<int32, int64>
      child 0, entries: struct<key: int32 not null, value: int64> not null
          child 0, key: int32 not null
          child 1, value: int64
  child 10, lower_bounds: map<int32, binary>
      child 0, entries: struct<key: int32 not null, value: binary> not null
          child 0, key: int32 not null
          child 1, value: binary
  child 11, upper_bounds: map<int32, binary>
      child 0, entries: struct<key: int32 not null, value: binary> not null
          child 0, key: int32 not null
          child 1, value: binary
  child 12, key_metadata: binary
  child 13, split_offsets: list<item: int64>
      child 0, item: int64
  child 14, equality_ids: list<item: int32>
      child 0, item: int32
  child 15, sort_order_id: int32
readable_metrics: struct<city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null>
  child 0, city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null
      child 0, column_size: int64
      child 1, value_count: int64
      child 2, null_value_count: int64
      child 3, nan_value_count: int64
      child 4, lower_bound: string
      child 5, upper_bound: string
  child 1, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
      child 0, column_size: int64
      child 1, value_count: int64
      child 2, null_value_count: int64
      child 3, nan_value_count: int64
      child 4, lower_bound: double
      child 5, upper_bound: double
  child 2, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
      child 0, column_size: int64
      child 1, value_count: int64
      child 2, null_value_count: int64
      child 3, nan_value_count: int64
      child 4, lower_bound: double
      child 5, upper_bound: double
----
status: [[1]]
snapshot_id: [[6245626162224016531]]
sequence_number: [[1]]
file_sequence_number: [[1]]
data_file: [
  -- is_valid: all not null
  -- child 0 type: int8
[0]
  -- child 1 type: string
["s3://warehouse/default/cities/data/00000-0-80766b66-e558-4150-a5cf-85e4c609b9fe.parquet"]
  -- child 2 type: string
["PARQUET"]
  -- child 3 type: struct<>
    -- is_valid: all not null
  -- child 4 type: int64
[4]
  -- child 5 type: int64
[1656]
  -- child 6 type: map<int32, int64>
[keys:[1,2,3]values:[140,135,135]]
  -- child 7 type: map<int32, int64>
[keys:[1,2,3]values:[4,4,4]]
  -- child 8 type: map<int32, int64>
[keys:[1,2,3]values:[0,0,0]]
  -- child 9 type: map<int32, int64>
[keys:[]values:[]]
  -- child 10 type: map<int32, binary>
[keys:[1,2,3]values:[416D7374657264616D,8602B68311E34240,3A77BB5E9A9B5EC0]]
  -- child 11 type: map<int32, binary>
[keys:[1,2,3]values:[53616E204672616E636973636F,F5BEF1B5678E4A40,304CA60A46651840]]
  -- child 12 type: binary
[null]
  -- child 13 type: list<item: int64>
[[4]]
  -- child 14 type: list<item: int32>
[null]
  -- child 15 type: int32
[null]]
readable_metrics: [
  -- is_valid: all not null
  -- child 0 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string>
    -- is_valid: all not null
    -- child 0 type: int64
[140]
    -- child 1 type: int64
[4]
    -- child 2 type: int64
[0]
    -- child 3 type: int64
[null]
    -- child 4 type: string
["Amsterdam"]
    -- child 5 type: string
["San Francisco"]
  -- child 1 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
    -- is_valid: all not null
    -- child 0 type: int64
[135]
    -- child 1 type: int64
[4]
    -- child 2 type: int64
[0]
    -- child 3 type: int64
[null]
    -- child 4 type: double
[37.773972]
    -- child 5 type: double
[53.11254]
  -- child 2 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
    -- is_valid: all not null
    -- child 0 type: int64
[135]
    -- child 1 type: int64
[4]
    -- child 2 type: int64
[0]
    -- child 3 type: int64
[null]
    -- child 4 type: double
[-122.431297]
    -- child 5 type: double
[6.0989]]

引用

用于显示数据表已知的快照引用:

table.inspect.refs()
pyarrow.Table
name: string not null
type: string not null
snapshot_id: int64 not null
max_reference_age_in_ms: int64
min_snapshots_to_keep: int32
max_snapshot_age_in_ms: int64
----
name: [["main","testTag"]]
type: [["BRANCH","TAG"]]
snapshot_id: [[2278002651076891950,2278002651076891950]]
max_reference_age_in_ms: [[null,604800000]]
min_snapshots_to_keep: [[null,10]]
max_snapshot_age_in_ms: [[null,604800000]]

清单文件

要显示表的当前文件清单:

table.inspect.manifests()
pyarrow.Table
content: int8 not null
path: string not null
length: int64 not null
partition_spec_id: int32 not null
added_snapshot_id: int64 not null
added_data_files_count: int32 not null
existing_data_files_count: int32 not null
deleted_data_files_count: int32 not null
added_delete_files_count: int32 not null
existing_delete_files_count: int32 not null
deleted_delete_files_count: int32 not null
partition_summaries: list<item: struct<contains_null: bool not null, contains_nan: bool, lower_bound: string, upper_bound: string>> not null
  child 0, item: struct<contains_null: bool not null, contains_nan: bool, lower_bound: string, upper_bound: string>
      child 0, contains_null: bool not null
      child 1, contains_nan: bool
      child 2, lower_bound: string
      child 3, upper_bound: string
----
content: [[0]]
path: [["s3://warehouse/default/table_metadata_manifests/metadata/3bf5b4c6-a7a4-4b43-a6ce-ca2b4887945a-m0.avro"]]
length: [[6886]]
partition_spec_id: [[0]]
added_snapshot_id: [[3815834705531553721]]
added_data_files_count: [[1]]
existing_data_files_count: [[0]]
deleted_data_files_count: [[0]]
added_delete_files_count: [[0]]
existing_delete_files_count: [[0]]
deleted_delete_files_count: [[0]]
partition_summaries: [[    -- is_valid: all not null
    -- child 0 type: bool
[false]
    -- child 1 type: bool
[false]
    -- child 2 type: string
["test"]
    -- child 3 type: string
["test"]]]

元数据日志条目

用于显示表元数据日志条目:

table.inspect.metadata_log_entries()
pyarrow.Table
timestamp: timestamp[ms] not null
file: string not null
latest_snapshot_id: int64
latest_schema_id: int32
latest_sequence_number: int64
----
timestamp: [[2024-04-28 17:03:00.214,2024-04-28 17:03:00.352,2024-04-28 17:03:00.445,2024-04-28 17:03:00.498]]
file: [["s3://warehouse/default/table_metadata_log_entries/metadata/00000-0b3b643b-0f3a-4787-83ad-601ba57b7319.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00001-f74e4b2c-0f89-4f55-822d-23d099fd7d54.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00002-97e31507-e4d9-4438-aff1-3c0c5304d271.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00003-6c8b7033-6ad8-4fe4-b64d-d70381aeaddc.metadata.json"]]
latest_snapshot_id: [[null,3958871664825505738,1289234307021405706,7640277914614648349]]
latest_schema_id: [[null,0,0,0]]
latest_sequence_number: [[null,0,0,0]]

历史

要查看表的历史记录:

table.inspect.history()
pyarrow.Table
made_current_at: timestamp[ms] not null
snapshot_id: int64 not null
parent_id: int64
is_current_ancestor: bool not null
----
made_current_at: [[2024-06-18 16:17:48.768,2024-06-18 16:17:49.240,2024-06-18 16:17:49.343,2024-06-18 16:17:49.511]]
snapshot_id: [[4358109269873137077,3380769165026943338,4358109269873137077,3089420140651211776]]
parent_id: [[null,4358109269873137077,null,4358109269873137077]]
is_current_ancestor: [[true,false,true,true]]

文件

查看表当前快照中的数据文件:

table.inspect.files()
pyarrow.Table
content: int8 not null
file_path: string not null
file_format: dictionary<values=string, indices=int32, ordered=0> not null
spec_id: int32 not null
record_count: int64 not null
file_size_in_bytes: int64 not null
column_sizes: map<int32, int64>
  child 0, entries: struct<key: int32 not null, value: int64> not null
      child 0, key: int32 not null
      child 1, value: int64
value_counts: map<int32, int64>
  child 0, entries: struct<key: int32 not null, value: int64> not null
      child 0, key: int32 not null
      child 1, value: int64
null_value_counts: map<int32, int64>
  child 0, entries: struct<key: int32 not null, value: int64> not null
      child 0, key: int32 not null
      child 1, value: int64
nan_value_counts: map<int32, int64>
  child 0, entries: struct<key: int32 not null, value: int64> not null
      child 0, key: int32 not null
      child 1, value: int64
lower_bounds: map<int32, binary>
  child 0, entries: struct<key: int32 not null, value: binary> not null
      child 0, key: int32 not null
      child 1, value: binary
upper_bounds: map<int32, binary>
  child 0, entries: struct<key: int32 not null, value: binary> not null
      child 0, key: int32 not null
      child 1, value: binary
key_metadata: binary
split_offsets: list<item: int64>
  child 0, item: int64
equality_ids: list<item: int32>
  child 0, item: int32
sort_order_id: int32
readable_metrics: struct<city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: large_string, upper_bound: large_string> not null, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null>
  child 0, city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null
      child 0, column_size: int64
      child 1, value_count: int64
      child 2, null_value_count: int64
      child 3, nan_value_count: int64
      child 4, lower_bound: large_string
      child 5, upper_bound: large_string
  child 1, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
      child 0, column_size: int64
      child 1, value_count: int64
      child 2, null_value_count: int64
      child 3, nan_value_count: int64
      child 4, lower_bound: double
      child 5, upper_bound: double
  child 2, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
      child 0, column_size: int64
      child 1, value_count: int64
      child 2, null_value_count: int64
      child 3, nan_value_count: int64
      child 4, lower_bound: double
      child 5, upper_bound: double
----
content: [[0,0]]
file_path: [["s3://warehouse/default/table_metadata_files/data/00000-0-9ea7d222-6457-467f-bad5-6fb125c9aa5f.parquet","s3://warehouse/default/table_metadata_files/data/00000-0-afa8893c-de71-4710-97c9-6b01590d0c44.parquet"]]
file_format: [["PARQUET","PARQUET"]]
spec_id: [[0,0]]
record_count: [[3,3]]
file_size_in_bytes: [[5459,5459]]
column_sizes: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[49,78,128,94,118,...,118,118,94,78,109],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[49,78,128,94,118,...,118,118,94,78,109]]]
value_counts: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[3,3,3,3,3,...,3,3,3,3,3],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[3,3,3,3,3,...,3,3,3,3,3]]]
null_value_counts: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[1,1,1,1,1,...,1,1,1,1,1],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[1,1,1,1,1,...,1,1,1,1,1]]]
nan_value_counts: [[keys:[]values:[],keys:[]values:[]]]
lower_bounds: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000]]]
upper_bounds:[[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000]]]
key_metadata: [[0100,0100]]
split_offsets:[[[],[]]]
equality_ids:[[[],[]]]
sort_order_id:[[[],[]]]
readable_metrics: [
  -- is_valid: all not null
  -- child 0 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: large_string, upper_bound: large_string>
    -- is_valid: all not null
    -- child 0 type: int64
[140]
    -- child 1 type: int64
[4]
    -- child 2 type: int64
[0]
    -- child 3 type: int64
[null]
    -- child 4 type: large_string
["Amsterdam"]
    -- child 5 type: large_string
["San Francisco"]
  -- child 1 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
    -- is_valid: all not null
    -- child 0 type: int64
[135]
    -- child 1 type: int64
[4]
    -- child 2 type: int64
[0]
    -- child 3 type: int64
[null]
    -- child 4 type: double
[37.773972]
    -- child 5 type: double
[53.11254]
  -- child 2 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
    -- is_valid: all not null
    -- child 0 type: int64
[135]
    -- child 1 type: int64
[4]
    -- child 2 type: int64
[0]
    -- child 3 type: int64
[null]
    -- child 4 type: double
[-122.431297]
    -- child 5 type: double
[6.0989]]

信息

内容指数据文件存储的内容类型:0表示数据1表示位置删除2表示等值删除

若仅查看当前快照中的数据文件或删除文件,请分别使用table.inspect.data_files()table.inspect.delete_files()方法。


添加文件

高级 Iceberg 用户可以选择将现有的 Parquet 文件作为数据文件提交到 Iceberg 表中,而无需重写这些文件。


# Given that these parquet files have schema consistent with the Iceberg table

file_paths = [
    "s3a://warehouse/default/existing-1.parquet",
    "s3a://warehouse/default/existing-2.parquet",
]

# They can be added to the table without rewriting them

tbl.add_files(file_paths=file_paths)

# A new snapshot is committed to the table with manifests pointing to the existing parquet files

名称映射

由于add_files直接使用现有文件而不生成新的感知Iceberg模式的Parquet文件,因此要求Iceberg表必须配置名称映射(该映射将Parquet文件中的字段名与Iceberg字段ID对应)。因此,add_files要求Parquet文件的元数据中不能包含字段ID,若表未配置名称映射,则会基于当前表模式自动创建新的名称映射。

分区处理

add_files仅需通过读取现有Parquet文件的元数据页脚来推断每个文件的分区值。该实现还支持将文件添加到采用MonthTransformTruncateTransform等分区转换的Iceberg表(任何preserves_order属性设为True的转换都受支持)。请注意,若PartitionField源列的统计信息未存在于Parquet元数据中,分区值将被推断为None

维护操作影响

由于add_files将现有Parquet文件作为普通数据文件提交至Iceberg表,因此过期快照等破坏性维护操作会移除这些文件。


模式演进

PyIceberg 通过 Python API 提供了完整的模式演进支持。它会自动处理字段 ID 的设置,并确保只进行非破坏性变更(可手动覆盖此限制)。

在以下示例中,.update_schema() 方法直接从表对象调用。


with table.update_schema() as update:
    update.add_column("some_field", IntegerType(), "doc")

如果您需要进行的更改不仅限于模式演进,还可以启动一个事务:

with table.transaction() as transaction:
    with transaction.update_schema() as update_schema:
        update.add_column("some_other_field", IntegerType(), "doc")
    # ... Update properties etc

按名称合并

通过使用 .union_by_name() 方法,您可以将另一个模式合并到现有模式中,而无需担心字段ID的问题。


from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType, LongType

catalog = load_catalog()

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
)

table = catalog.create_table("default.locations", schema)

new_schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
    NestedField(10, "population", LongType(), required=False),
)

with table.update_schema() as update:
    update.union_by_name(new_schema)

现在该表已合并了两个模式print(table.schema())

table {
  1: city: optional string
  2: lat: optional double
  3: long: optional double
  4: population: optional long
}

添加列

使用 add_column 可以轻松添加列,无需担心字段ID问题。


with table.update_schema() as update:
    update.add_column("retries", IntegerType(), "Number of retries to place the bid")
    # In a struct
    update.add_column("details", StructType())

with table.update_schema() as update:
    update.add_column(("details", "confirmed_by"), StringType(), "Name of the exchange")

在向复合类型添加列之前,该复合类型必须已存在。复合类型中的字段以元组形式添加。


重命名列

在Iceberg表中重命名字段非常简单:

with table.update_schema() as update:
    update.rename_column("retries", "num_retries")
    # This will rename `confirmed_by` to `processed_by` in the `details` struct
    update.rename_column(("details", "confirmed_by"), "processed_by")

移动列

调整字段顺序:

with table.update_schema() as update:
    update.move_first("symbol")
    # This will move `bid` after `ask`
    update.move_after("bid", "ask")
    # This will move `confirmed_by` before `exchange` in the `details` struct
    update.move_before(("details", "confirmed_by"), ("details", "exchange"))

更新列

更新字段的类型、描述或必填属性。


with table.update_schema() as update:
    # Promote a float to a double
    update.update_column("bid", field_type=DoubleType())
    # Make a field optional
    update.update_column("symbol", required=False)
    # Update the documentation
    update.update_column("symbol", doc="Name of the share on the exchange")

请注意,某些操作并不兼容,但您仍可通过设置 allow_incompatible_changes 自行承担风险来执行。


with table.update_schema(allow_incompatible_changes=True) as update:
    # Incompatible change, cannot require an optional field
    update.update_column("symbol", required=True)

删除列

删除一个字段,注意这是不兼容的变更(读取器/写入器可能会依赖该字段):

with table.update_schema(allow_incompatible_changes=True) as update:
    update.delete_column("some_field")
    # In a struct
    update.delete_column(("details", "confirmed_by"))

分区演进

PyIceberg 支持分区演进功能。更多细节请参阅分区演进规范

在演进分区时,需要使用表上的 update_spec API。


with table.update_spec() as update:
    update.add_field("id", BucketTransform(16), "bucketed_id")
    update.add_field("event_ts", DayTransform(), "day_ts")

更新分区规格也可以作为与其他操作一起的事务的一部分来完成。


with table.transaction() as transaction:
    with transaction.update_spec() as update_spec:
        update_spec.add_field("id", BucketTransform(16), "bucketed_id")
        update_spec.add_field("event_ts", DayTransform(), "day_ts")
    # ... Update properties etc

添加字段

可以通过 add_field API 添加新的分区字段,该接口接收要分区的字段名称、分区转换函数以及可选的分区名称。如果未指定分区名称,系统会自动生成一个。


with table.update_spec() as update:
    update.add_field("id", BucketTransform(16), "bucketed_id")
    update.add_field("event_ts", DayTransform(), "day_ts")
    # identity is a shortcut API for adding an IdentityTransform
    update.identity("some_field")

移除字段

如果某些字段不再适合作为分区依据,也可以通过 remove_field API 来移除这些分区字段。


with table.update_spec() as update:
    # Remove the partition field with the name
    update.remove_field("some_partition_name")

重命名字段

可以通过rename_field API来重命名分区字段。


with table.update_spec() as update:
    # Rename the partition field with the name bucketed_id to sharded_id
    update.rename_field("bucketed_id", "sharded_id")

表属性

通过Transaction API设置和移除属性:

with table.transaction() as transaction:
    transaction.set_properties(abc="def")

assert table.properties == {"abc": "def"}

with table.transaction() as transaction:
    transaction.remove_properties("abc")

assert table.properties == {}

或者,不使用上下文管理器:

table = table.transaction().set_properties(abc="def").commit_transaction()

assert table.properties == {"abc": "def"}

table = table.transaction().remove_properties("abc").commit_transaction()

assert table.properties == {}

快照属性

在使用appendoverwrite API写入表时,可以选择性地设置快照属性:

tbl.append(df, snapshot_properties={"abc": "def"})

# or

tbl.overwrite(df, snapshot_properties={"abc": "def"})

assert tbl.metadata.snapshots[-1].summary["abc"] == "def"

快照管理

通过 Table API 管理快照操作:

# To run a specific operation
table.manage_snapshots().create_tag(snapshot_id, "tag123").commit()
# To run multiple operations
table.manage_snapshots()
    .create_tag(snapshot_id1, "tag123")
    .create_tag(snapshot_id2, "tag456")
    .commit()
# Operations are applied on commit.

你也可以使用上下文管理器来进行更多修改:

with table.manage_snapshots() as ms:
    ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789")

视图

PyIceberg 支持视图操作。


检查视图是否存在


from pyiceberg.catalog import load_catalog

catalog = load_catalog("default")
catalog.view_exists("default.bar")

表统计信息管理

通过Table API管理表统计信息的操作:

# To run a specific operation
table.update_statistics().set_statistics(statistics_file=statistics_file).commit()
# To run multiple operations
table.update_statistics()
  .set_statistics(statistics_file1)
  .remove_statistics(snapshot_id2)
  .commit()
# Operations are applied on commit.

你也可以使用上下文管理器来进行更多修改:

with table.update_statistics() as update:
    update.set_statistics(statistics_file)
    update.remove_statistics(snapshot_id2)

查询数据

要查询表数据,需要进行表扫描操作。表扫描可以接收以下参数:过滤器条件、列名、可选的记录条数限制以及快照ID。


from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual

catalog = load_catalog("default")
table = catalog.load_table("nyc.taxis")

scan = table.scan(
    row_filter=GreaterThanOrEqual("trip_distance", 10.0),
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
    limit=100,
)

# Or filter using a string predicate
scan = table.scan(
    row_filter="trip_distance > 10.0",
)

[task.file.file_path for task in scan.plan_files()]

底层 API plan_files 方法返回一组任务,这些任务提供可能包含匹配行的文件:

[
  "s3://warehouse/wh/nyc/taxis/data/00003-4-42464649-92dd-41ad-b83b-dea1a2fe4b58-00001.parquet"
]

在这种情况下,引擎需要自行过滤文件本身。以下 to_arrow()to_duckdb() 方法已为您实现了这一功能。


Apache Arrow

要求

需要安装pyarrow

使用PyIceberg可以从大表中筛选数据并提取到PyArrow表中:

table.scan(
    row_filter=GreaterThanOrEqual("trip_distance", 10.0),
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow()

这将返回一个 PyArrow 表:

pyarrow.Table
VendorID: int64
tpep_pickup_datetime: timestamp[us, tz=+00:00]
tpep_dropoff_datetime: timestamp[us, tz=+00:00]
----
VendorID: [[2,1,2,1,1,...,2,2,2,2,2],[2,1,1,1,2,...,1,1,2,1,2],...,[2,2,2,2,2,...,2,6,6,2,2],[2,2,2,2,2,...,2,2,2,2,2]]
tpep_pickup_datetime: [[2021-04-01 00:28:05.000000,...,2021-04-30 23:44:25.000000]]
tpep_dropoff_datetime: [[2021-04-01 00:47:59.000000,...,2021-05-01 00:14:47.000000]]

这将仅拉取可能包含匹配行的文件。

如果更倾向于一次读取一个记录批次,也可以返回一个PyArrow RecordBatchReader:

table.scan(
    row_filter=GreaterThanOrEqual("trip_distance", 10.0),
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow_batch_reader()

Pandas


需求

使用此功能需要安装pandas

PyIceberg 能轻松从海量表中筛选数据并加载到本地的 Pandas 数据框中。该操作仅会获取查询所需的 Parquet 文件并应用过滤条件,从而减少 IO 操作,提升性能并降低成本。


table.scan(
    row_filter="trip_distance >= 10.0",
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_pandas()

这将返回一个 Pandas 数据框:


***
VendorID      tpep_pickup_datetime     tpep_dropoff_datetime
0              2 2021-04-01 00:28:05+00:00 2021-04-01 00:47:59+00:00
1              1 2021-04-01 00:39:01+00:00 2021-04-01 00:57:39+00:00
2              2 2021-04-01 00:14:42+00:00 2021-04-01 00:42:59+00:00
3              1 2021-04-01 00:17:17+00:00 2021-04-01 00:43:38+00:00
4              1 2021-04-01 00:24:04+00:00 2021-04-01 00:56:20+00:00
...          ...                       ...                       ...
116976         2 2021-04-30 23:56:18+00:00 2021-05-01 00:29:13+00:00
116977         2 2021-04-30 23:07:41+00:00 2021-04-30 23:37:18+00:00
116978         2 2021-04-30 23:38:28+00:00 2021-05-01 00:12:04+00:00
116979         2 2021-04-30 23:33:00+00:00 2021-04-30 23:59:00+00:00
116980         2 2021-04-30 23:44:25+00:00 2021-05-01 00:14:47+00:00

[116981 rows x 3 columns]

建议使用 Pandas 2 或更高版本,因为它将数据存储在 Apache Arrow 后端 中,从而避免了数据拷贝。


DuckDB

要求

需要安装DuckDB

表扫描结果也可以转换为内存中的DuckDB表:

con = table.scan(
    row_filter=GreaterThanOrEqual("trip_distance", 10.0),
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_duckdb(table_name="distant_taxi_trips")

我们可以使用游标在DuckDB表上运行查询:

print(
    con.execute(
        "SELECT tpep_dropoff_datetime - tpep_pickup_datetime AS duration FROM distant_taxi_trips LIMIT 4"
    ).fetchall()
)
[
    (datetime.timedelta(seconds=1194),),
    (datetime.timedelta(seconds=1118),),
    (datetime.timedelta(seconds=1697),),
    (datetime.timedelta(seconds=1581),),
]

Ray

要求

这需要安装Ray

表扫描也可以转换为Ray数据集:

ray_dataset = table.scan(
    row_filter=GreaterThanOrEqual("trip_distance", 10.0),
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_ray()

这将返回一个 Ray 数据集:


***
Dataset(
    num_blocks=1,
    num_rows=1168798,
    schema={
        VendorID: int64,
        tpep_pickup_datetime: timestamp[us, tz=UTC],
        tpep_dropoff_datetime: timestamp[us, tz=UTC]
    }
)

使用 Ray Dataset API 与数据集交互:

print(ray_dataset.take(2))
[
    {
        "VendorID": 2,
        "tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 23, 50),
        "tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 0, 34, 31),
    },
    {
        "VendorID": 2,
        "tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 5, 3),
        "tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 16, 10, 18),
    },
]

Daft

PyIceberg 与 Daft Dataframes 紧密集成(参见:Daft 与 Iceberg 的集成),它在 PyIceberg 表之上提供了一个完整的惰性优化查询引擎接口。

要求

这需要安装 Daft

可以轻松将表读取到 Daft Dataframe 中:

df = table.to_daft()  # equivalent to `daft.read_iceberg(table)`
df = df.where(df["trip_distance"] >= 10.0)
df = df.select("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime")

这将返回一个延迟物化的 Daft Dataframe。打印 df 会显示其结构模式:

╭──────────┬───────────────────────────────┬───────────────────────────────╮
│ VendorID ┆ tpep_pickup_datetime          ┆ tpep_dropoff_datetime         │
│ ---      ┆ ---                           ┆ ---                           │
│ Int64    ┆ Timestamp(Microseconds, None) ┆ Timestamp(Microseconds, None) │
╰──────────┴───────────────────────────────┴───────────────────────────────╯

(No data to display: Dataframe not materialized)

我们可以使用 df.show() 执行 Dataframe 来预览查询的前几行数据。

该操作已正确优化,能够充分利用 Iceberg 的特性,如隐藏分区和文件级统计信息,以实现高效读取。


df.show(2)
╭──────────┬───────────────────────────────┬───────────────────────────────╮
│ VendorID ┆ tpep_pickup_datetime          ┆ tpep_dropoff_datetime         │
│ ---------                           │
│ Int64    ┆ Timestamp(Microseconds, None) ┆ Timestamp(Microseconds, None) │
╞══════════╪═══════════════════════════════╪═══════════════════════════════╡
│ 22008-12-31T23:23:50.0000002009-01-01T00:34:31.000000    │
├╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 22008-12-31T23:05:03.0000002009-01-01T16:10:18.000000    │
╰──────────┴───────────────────────────────┴───────────────────────────────╯

(Showing first 2 rows)

Polars

PyIceberg 与 Polars 的 Dataframe 和 LazyFrame 紧密集成,在 PyIceberg 表之上提供了完整的惰性优化查询引擎接口。

要求

需要安装 polars


pip install pyiceberg['polars']

可以通过 DataFrame 或 LazyFrame 在 Polars 中分析和访问 PyIceberg 数据。如果您的代码使用 Apache Iceberg 数据扫描和检索 API,然后在 Polars 中分析结果 DataFrame,请使用 table.scan().to_polars() API。如果目的是利用 Polars 的高性能过滤和检索功能,请使用从 Iceberg 表导出的 LazyFrame 并配合 table.to_polars() API。


# Get LazyFrame
iceberg_table.to_polars()

# Get Data Frame
iceberg_table.scan().to_polars()

使用 Polars DataFrame 处理数据

PyIceberg 可以轻松地从海量表中筛选数据,并将其加载到本地的 Polars 数据框中。该操作仅会获取查询所需的 Parquet 文件并应用过滤条件,从而减少 I/O 操作,提升性能并降低成本。


schema = Schema(
    NestedField(field_id=1, name='ticket_id', field_type=LongType(), required=True),
    NestedField(field_id=2, name='customer_id', field_type=LongType(), required=True),
    NestedField(field_id=3, name='issue', field_type=StringType(), required=False),
    NestedField(field_id=4, name='created_at', field_type=TimestampType(), required=True),
  required=True
)

iceberg_table = catalog.create_table(
    identifier='default.product_support_issues',
    schema=schema
)

pa_table_data = pa.Table.from_pylist(
    [
        {'ticket_id': 1, 'customer_id': 546, 'issue': 'User Login issue', 'created_at': 1650020000000000},
        {'ticket_id': 2, 'customer_id': 547, 'issue': 'Payment not going through', 'created_at': 1650028640000000},
        {'ticket_id': 3, 'customer_id': 548, 'issue': 'Error on checkout', 'created_at': 1650037280000000},
        {'ticket_id': 4, 'customer_id': 549, 'issue': 'Unable to reset password', 'created_at': 1650045920000000},
        {'ticket_id': 5, 'customer_id': 550, 'issue': 'Account locked', 'created_at': 1650054560000000},
        {'ticket_id': 6, 'customer_id': 551, 'issue': 'Order not received', 'created_at': 1650063200000000},
        {'ticket_id': 7, 'customer_id': 552, 'issue': 'Refund not processed', 'created_at': 1650071840000000},
        {'ticket_id': 8, 'customer_id': 553, 'issue': 'Shipping address issue', 'created_at': 1650080480000000},
        {'ticket_id': 9, 'customer_id': 554, 'issue': 'Product damaged', 'created_at': 1650089120000000},
        {'ticket_id': 10, 'customer_id': 555, 'issue': 'Unable to apply discount code', 'created_at': 1650097760000000},
        {'ticket_id': 11, 'customer_id': 556, 'issue': 'Website not loading', 'created_at': 1650106400000000},
        {'ticket_id': 12, 'customer_id': 557, 'issue': 'Incorrect order received', 'created_at': 1650115040000000},
        {'ticket_id': 13, 'customer_id': 558, 'issue': 'Unable to track order', 'created_at': 1650123680000000},
        {'ticket_id': 14, 'customer_id': 559, 'issue': 'Order delayed', 'created_at': 1650132320000000},
        {'ticket_id': 15, 'customer_id': 560, 'issue': 'Product not as described', 'created_at': 1650140960000000},
        {'ticket_id': 16, 'customer_id': 561, 'issue': 'Unable to contact support', 'created_at': 1650149600000000},
        {'ticket_id': 17, 'customer_id': 562, 'issue': 'Duplicate charge', 'created_at': 1650158240000000},
        {'ticket_id': 18, 'customer_id': 563, 'issue': 'Unable to update profile', 'created_at': 1650166880000000},
        {'ticket_id': 19, 'customer_id': 564, 'issue': 'App crashing', 'created_at': 1650175520000000},
        {'ticket_id': 20, 'customer_id': 565, 'issue': 'Unable to download invoice', 'created_at': 1650184160000000},
        {'ticket_id': 21, 'customer_id': 566, 'issue': 'Incorrect billing amount', 'created_at': 1650192800000000},
    ], schema=iceberg_table.schema().as_arrow()
)

iceberg_table.append(
    df=pa_table_data
)

table.scan(
    row_filter="ticket_id > 10",
).to_polars()

这将返回一个 Polars DataFrame:

shape: (11, 4)
┌───────────┬─────────────┬────────────────────────────┬─────────────────────┐
│ ticket_id ┆ customer_id ┆ issue                      ┆ created_at          │
│ ---       ┆ ---         ┆ ---                        ┆ ---                 │
│ i64       ┆ i64         ┆ str                        ┆ datetime[μs]        │
╞═══════════╪═════════════╪════════════════════════════╪═════════════════════╡
│ 11        ┆ 556         ┆ Website not loading        ┆ 2022-04-16 10:53:20 │
│ 12        ┆ 557         ┆ Incorrect order received   ┆ 2022-04-16 13:17:20 │
│ 13        ┆ 558         ┆ Unable to track order      ┆ 2022-04-16 15:41:20 │
│ 14        ┆ 559         ┆ Order delayed              ┆ 2022-04-16 18:05:20 │
│ 15        ┆ 560         ┆ Product not as described   ┆ 2022-04-16 20:29:20 │
│ …         ┆ …           ┆ …                          ┆ …                   │
│ 17        ┆ 562         ┆ Duplicate charge           ┆ 2022-04-17 01:17:20 │
│ 18        ┆ 563         ┆ Unable to update profile   ┆ 2022-04-17 03:41:20 │
│ 19        ┆ 564         ┆ App crashing               ┆ 2022-04-17 06:05:20 │
│ 20        ┆ 565         ┆ Unable to download invoice ┆ 2022-04-17 08:29:20 │
│ 21        ┆ 566         ┆ Incorrect billing amount   ┆ 2022-04-17 10:53:20 │
└───────────┴─────────────┴────────────────────────────┴─────────────────────┘

使用 Polars LazyFrame 工作

PyIceberg 支持基于 Iceberg 表创建 Polars LazyFrame。

使用上述代码示例:

lf = iceberg_table.to_polars().filter(pl.col("ticket_id") > 10)
print(lf.collect())

上述代码片段返回一个 Polars LazyFrame,并定义了一个将由 Polars 执行的过滤器:

shape: (11, 4)
┌───────────┬─────────────┬────────────────────────────┬─────────────────────┐
│ ticket_id ┆ customer_id ┆ issue                      ┆ created_at          │
│ ---       ┆ ---         ┆ ---                        ┆ ---                 │
│ i64       ┆ i64         ┆ str                        ┆ datetime[μs]        │
╞═══════════╪═════════════╪════════════════════════════╪═════════════════════╡
│ 11        ┆ 556         ┆ Website not loading        ┆ 2022-04-16 10:53:20 │
│ 12        ┆ 557         ┆ Incorrect order received   ┆ 2022-04-16 13:17:20 │
│ 13        ┆ 558         ┆ Unable to track order      ┆ 2022-04-16 15:41:20 │
│ 14        ┆ 559         ┆ Order delayed              ┆ 2022-04-16 18:05:20 │
│ 15        ┆ 560         ┆ Product not as described   ┆ 2022-04-16 20:29:20 │
│ …         ┆ …           ┆ …                          ┆ …                   │
│ 17        ┆ 562         ┆ Duplicate charge           ┆ 2022-04-17 01:17:20 │
│ 18        ┆ 563         ┆ Unable to update profile   ┆ 2022-04-17 03:41:20 │
│ 19        ┆ 564         ┆ App crashing               ┆ 2022-04-17 06:05:20 │
│ 20        ┆ 565         ┆ Unable to download invoice ┆ 2022-04-17 08:29:20 │
│ 21        ┆ 566         ┆ Incorrect billing amount   ┆ 2022-04-17 10:53:20 │
└───────────┴─────────────┴────────────────────────────┴─────────────────────┘

2025-05-28(三)

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

富婆E

请我喝杯伯爵奶茶~!

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值