Written By: Diptiman Raichaudhuri, Staff Developer Advocate at Confluent
This article shows data engineers how to use PyIceberg, a lightweight and powerful Python library. PyIceberg makes it easier to perform common data tasks like creating, reading, modifying, or deleting data in Apache Iceberg, without needing a big cluster.
Driven by complex business demands and the need to analyze larger volumes of information, data platforms have transformed significantly over the past few years to help businesses extract more insights and value from their diverse sources of data.
For enterprise analytics use cases, the open data lakehouse platform has been at the forefront of this platform evolution. An open data lakehouse enables data teams to create ‘composable’ architectures within their ecosystem. With this pattern, data teams can design data platforms with storage, compute and data governance layers of their choice, to cater to businesses’ ever-evolving needs. Open Table Formats (OTF) like Apache Iceberg are the engines driving the adoption of modern open lakehouses.
Composable Data Platform Architecture
Data platforms built with Apache Iceberg normally have three layers:
- The data layer: Physical data files (normally Parquet, Avro, or ORC formats) are stored in this layer.
- The metadata layer: Instead of sorting tables into individual directories, Iceberg maintains a list of files. The metadata layer manages table snapshots, schema, and file statistics.
- Iceberg Catalog: The catalog is the central repository that facilitates table discovery, creation, and modification of tables and ensures transactional consistency of managing Iceberg tables.
This popular diagram from the Apache Iceberg documentation illustrates these layers:
Apache Iceberg Layers (Source)
What is PyIceberg?
PyIceberg empowers analytics and data engineers to build sophisticated open lakehouse platforms on a wide variety of clouds (AWS, Azure, and Google Cloud) and on-premise storage. PyIceberg is a Python implementation for accessing Iceberg tables. Developers using PyIceberg can use Pythonic data transformations without the need to run highly performant query engines in Java Virtual Machine (JVM) clusters. PyIceberg uses catalogs to load Iceberg tables and perform read-write-upsert operations. It handles the metadata and table format aspects of Iceberg, allowing data engineers to use small, fast, and highly efficient data engines like PyArrow (the Python implementation of Apache Arrow), Daft, or DuckDB as the compute layer for the actual data processing..
PyIceberg can run as a standalone program or on Kubernetes clusters for fault tolerance. Its native integration with Iceberg catalog protocols like REST, SQLCatalog, or AWS Glue, makes it a popular and easy choice for querying Iceberg tables without the need for JVM/py4j clusters.
Production deployments of PyIceberg often integrate data streaming workloads like Tableflow, where streaming data is produced as Apache Kafka topics and materialized as Iceberg tables. This is a powerful tool to bridge the gap between operational data systems and analytical data systems.
Why PyIceberg?
PyIceberg provides a Python-friendly way to run data manipulation language (DML) operations on Iceberg tables. For small to medium-sized data platforms that work with 100s of gigabytes of data — such as those handling departmental analytics, internal reporting, or specialized tooling — ease of use is often more important for businesses than complex features. If the data volume (both historical and incremental) is not huge, deploying a full blown cluster to run queries on Iceberg may seem overwhelming and overkill. That’s because these query engines (like Spark and Flink) rely on Scala or Java programming languages running on Java Virtual Machine (JVM) to impose and optimize multithreaded and multicore processing. For Python programmers, this meant using Py4J, which enables Python programs running in a Python interpreter to dynamically access Java objects in a JVM.
In the next section, let’s build a demo with PyIceberg to understand how Iceberg’s read and write patterns work.
How to build a data lakehouse with PyIceberg, PyArrow and DuckDB
Let’s practice and build a demo of an IoT sensor data lakehouse with PyIceberg. For this example, we will use PyIceberg and PyArrow to insert/upsert and delete Iceberg data and build in Visual Studio Code (VS Code).
First, a new Python virtual environment is created called ‘pyiceberg_playground’, by running the following command:
$>python -m venv iceberg_playground
Then this directory — ‘iceberg_playground’ — ’is opened in VS Code where the PyIceberg project would be hosted. The following image shows the VS Code ‘clean slate.’
PyIceberg and other libraries are then installed in the virtual environment by running the following two commands:
$>source bin/activate
(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas
For this example, PyIceberg would use SqlCatalog, which stores Iceberg table information in a local SQLite database. Iceberg also supports catalogs including REST, Hive, and AWS Glue.
A configuration file .pyiceberg.yaml is prepared with the following content, at the project root:
catalog:
pyarrowtest:
uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db
warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1
Notice how the Iceberg catalog is stored under the pyiceberg_catalog directory as a SQLite file and in the data warehouse, which stores all data and metadata in the directory dw1.
Both these directories are now created within the project root level. This catalog is named pyarrowtest.
Next, the PyIceberg setup is checked using the following script:
import os
from pyiceberg.catalog import load_catalog
os.environ["PYICEBERG_HOME"] = os.getcwd()
catalog = load_catalog(name='pyarrowtest')
print(catalog.properties)
Notice how PyIceberg reads the catalog name from the YAML file and creates a local SQLite database in the pyiceberg_catalog directory. Since SQLite is distributed with Python installers, it does not need to be installed separately.
If the script is run properly, the ‘pyarrow_catalog’ properties should be displayed in the terminal.
The script loaded the catalog from the.YAML file, since the ‘PYICEBEG_HOME’ environment variable has been specified as the project root.
Next, a schema is added using the schema class of PyIceberg. Since this example stores data from a set of IoT sensors, the schema is constructed with three columns and their required data types. The field device_id has been set both as a primary key and as a partition key.
Then, a namespace is created along with the Iceberg table with the schema. A namespace is a logical grouping of tables within a warehouse (remember how a warehouse is already created when defining the YAML file).
The initial data load is done using a PyArrow in-memory list, and the sensor_table is read with the PyIceberg scan() method to convert the data into a pandas dataframe.
import os
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (NestedField,
StringType, FloatType)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
import pyarrow as pa
os.environ["PYICEBERG_HOME"] = os.getcwd()
catalog = load_catalog(name='pyarrowtest')
print(catalog.properties)
# Define the schema
schema = Schema(
NestedField(1, "device_id", StringType(), required=True),
NestedField(2, "ampere_hour", FloatType(), required=True),
NestedField(3, "device_make", StringType(), required=True),
identifier_field_ids=[1] # 'device_id' is the primary key
)
# Define the partition spec - device_id as partition key
partition_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")
)
# Create a namespace and an iceberg table
catalog.create_namespace_if_not_exists('sensor_ns')
sensor_table = catalog.create_table_if_not_exists(
identifier='sensor_ns.sensor_data',
schema=schema,
partition_spec=partition_spec
)
# Insert initial data
initial_data = pa.table([
pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),
pa.array([21.43, 17.86, 31.65]),
pa.array(['ABB', 'Honeywell', 'Siemens'])
], schema=schema.as_arrow())
# Write initial data
sensor_table.overwrite(initial_data)
# Print a Pandas dataframe representation of the table data
print("nInsert operation completed")
print(sensor_table.scan().to_pandas())
If the above script is run successfully, this result is displayed in the terminal:
With the insertion successfully completed, the three layers of Iceberg can be validated:
- Catalog: It is the SQLite file pyarrow_catalog.db, which will be verified a little later in this article.
- Metadata: Within the ‘metadata’ directory, metadata files are created, which are crucial for enabling Create, Read, Update, Delete (CRUD) operations. Two metadata JSON files are created, one while the table was created, and the other after the first insertion of data. The ‘snap-*.avro’ file is the manifest list, and the manifest file is the other .avro file.
- Data: Data files are written in the .PARQUET format, with device_id as the partition key. Since there are three distinct devices, three directories are created. The Iceberg table ‘sensor_data’ is created with the namespace ‘sensor_ns.db’ in the warehouse ‘dw1’. These data fields are created in the ‘data’ directory of the ‘sensor_data’ table.
PyIceberg expressions can be used to filter records. Some of the common expressions used for filtering are: StartsWith, EqualTo, GreaterThan, And, Or, etc.
from pyiceberg.expressions import StartsWith, EqualTo
# Filter columns
print("nFilter records with device_make == ABB ")
print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())
PyIceberg supports UPSERT operations as well. The following code sample updates the existing device_make for one of the sensors from ‘Siemens’ to ‘Kepware’.
# Create an UPSERT batch of Arrow records where one fot he device_make is changed
upsert_data = pa.table([
pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),
pa.array([21.43, 17.86, 31.65]),
pa.array(['ABB', 'Honeywell', 'Kepware'])
], schema=schema.as_arrow())
# UPSERT changed data
try:
join_columns = ["device_id"]
upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))
except Exception as e:
print(e)
print("nUpsert operation completed")
print(sensor_table.scan().to_pandas())
In a similar manner, DELETE operation is also supported:
# Delete row
sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))
print("n After Delete")
print(sensor_table.scan().to_pandas())
It is worth mentioning that deletion in any warehouse is a nuanced operation, and Iceberg is no exception. Row-level operations in Iceberg are defined by two strategies: Copy-on-Write (CoW) and Merge-on-Read (MoR). Delete operations also create delete files for the MoR strategy.
PyIceberg currently supports MOR deletes, but with some nuances. While PyIceberg offers the ability to delete rows, it primarily implements this using CoW deletes by default, meaning data files are rewritten instead of creating deleted files. However, there’s work underway to enhance PyIceberg’s MoR to support equality deletes and make it more efficient for frequent, small updates.
As a last step, let’s use DuckDB to query the SQLite Iceberg catalog, stored in the pyarrow_catalog.db file. The following command is run on the VS Code terminal:
duckdb -ui pyiceberg_catalog/pyarrow_catalog.db
This will open up a browser window at port 4213 (default), where a SQL query could be run on the Iceberg catalog, as shown:
This provides an easy and simple way of extracting insights from the SQL catalog
Unlocking Data Insights with PyIceberg
For companies with data volumes that are smaller than terabytes, PyIceberg and PyArrow are quick options to run interactive queries in Python. Using this combination can significantly reduce the time to generate insights for small to medium sized lakehouses.
Data engineers can get started with PyIceberg documentation, which is maintained and kept up to date. Also, the API page has great examples of all PyIceberg APIs.
Happy coding!