Iceberg (dagster-iceberg)
This library provides an integration with the Iceberg table format.
For more information on getting started, see the Dagster & Iceberg documentation.
Note: This is a community-supported integration. For support, see the Dagster Community Integrations repository.
I/O Managers
- dagster_iceberg.io_manager.arrow.PyArrowIcebergIOManager IOManagerDefinition [source]
- previewThis API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use. An I/O manager definition that reads inputs from and writes outputs to Iceberg tables using PyArrow. Examples: import pandas as pd
 import pyarrow as pa
 from dagster import Definitions, asset
 from dagster_iceberg.config import IcebergCatalogConfig
 from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager
 CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
 CATALOG_WAREHOUSE = (
 "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
 )
 resources = {
 "io_manager": PyArrowIcebergIOManager(
 name="test",
 config=IcebergCatalogConfig(
 properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
 ),
 namespace="dagster",
 )
 }
 @asset
 def iris_dataset() -> pa.Table:
 pa.Table.from_pandas(
 pd.read_csv(
 "https://docs.dagster.io/assets/iris.csv",
 names=[
 "sepal_length_cm",
 "sepal_width_cm",
 "petal_length_cm",
 "petal_width_cm",
 "species",
 ],
 )
 )
 defs = Definitions(assets=[iris_dataset], resources=resources)If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist. @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> pa.Table:
 ...To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the InorAssetIn.@asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: pa.Table):
 # my_table will just contain the data from column "a"
 ...
- dagster_iceberg.io_manager.daft.DaftIcebergIOManager IOManagerDefinition [source]
- previewThis API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use. An I/O manager definition that reads inputs from and writes outputs to Iceberg tables using Daft. Examples: import daft as da
 import pandas as pd
 from dagster import Definitions, asset
 from dagster_iceberg.config import IcebergCatalogConfig
 from dagster_iceberg.io_manager.daft import DaftIcebergIOManager
 CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
 CATALOG_WAREHOUSE = (
 "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
 )
 resources = {
 "io_manager": DaftIcebergIOManager(
 name="test",
 config=IcebergCatalogConfig(
 properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
 ),
 namespace="dagster",
 )
 }
 @asset
 def iris_dataset() -> da.DataFrame:
 return da.from_pandas(
 pd.read_csv(
 "https://docs.dagster.io/assets/iris.csv",
 names=[
 "sepal_length_cm",
 "sepal_width_cm",
 "petal_length_cm",
 "petal_width_cm",
 "species",
 ],
 )
 )
 defs = Definitions(assets=[iris_dataset], resources=resources)If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist. @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> da.DataFrame:
 ...To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the InorAssetIn.@asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: da.DataFrame):
 # my_table will just contain the data from column "a"
 ...
- dagster_iceberg.io_manager.pandas.PandasIcebergIOManager IOManagerDefinition [source]
- previewThis API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use. An I/O manager definition that reads inputs from and writes outputs to Iceberg tables using pandas. Examples: import pandas as pd
 from dagster import Definitions, asset
 from dagster_iceberg.config import IcebergCatalogConfig
 from dagster_iceberg.io_manager.pandas import PandasIcebergIOManager
 CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
 CATALOG_WAREHOUSE = (
 "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
 )
 resources = {
 "io_manager": PandasIcebergIOManager(
 name="test",
 config=IcebergCatalogConfig(
 properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
 ),
 namespace="dagster",
 )
 }
 @asset
 def iris_dataset() -> pd.DataFrame:
 return pd.read_csv(
 "https://docs.dagster.io/assets/iris.csv",
 names=[
 "sepal_length_cm",
 "sepal_width_cm",
 "petal_length_cm",
 "petal_width_cm",
 "species",
 ],
 )
 defs = Definitions(assets=[iris_dataset], resources=resources)If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist. @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> pd.DataFrame:
 ...To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the InorAssetIn.@asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: pd.DataFrame):
 # my_table will just contain the data from column "a"
 ...
- dagster_iceberg.io_manager.polars.PolarsIcebergIOManager IOManagerDefinition [source]
- previewThis API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use. An I/O manager definition that reads inputs from and writes outputs to Iceberg tables using Polars. Examples: import polars as pl
 from dagster import Definitions, asset
 from dagster_iceberg.config import IcebergCatalogConfig
 from dagster_iceberg.io_manager.polars import PolarsIcebergIOManager
 CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
 CATALOG_WAREHOUSE = (
 "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
 )
 resources = {
 "io_manager": PolarsIcebergIOManager(
 name="test",
 config=IcebergCatalogConfig(
 properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
 ),
 namespace="dagster",
 )
 }
 @asset
 def iris_dataset() -> pl.DataFrame:
 return pl.read_csv(
 "https://docs.dagster.io/assets/iris.csv",
 has_header=False,
 new_columns=[
 "sepal_length_cm",
 "sepal_width_cm",
 "petal_length_cm",
 "petal_width_cm",
 "species",
 ],
 )
 defs = Definitions(assets=[iris_dataset], resources=resources)If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist. @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> pl.DataFrame:
 ...To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the InorAssetIn.@asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: pl.DataFrame):
 # my_table will just contain the data from column "a"
 ...
- dagster_iceberg.io_manager.spark.SparkIcebergIOManager IOManagerDefinition [source]
- previewThis API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use. An I/O manager definition that reads inputs from and writes outputs to Iceberg tables using PySpark. This I/O manager is only designed to work with Spark Connect. Example: from dagster import Definitions, asset
 from dagster_iceberg.io_manager.spark import SparkIcebergIOManager
 from pyspark.sql import SparkSession
 from pyspark.sql.connect.dataframe import DataFrame
 resources = {
 "io_manager": SparkIcebergIOManager(
 catalog_name="test",
 namespace="dagster",
 remote_url="spark://localhost",
 )
 }
 @asset
 def iris_dataset() -> DataFrame:
 spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
 return spark.read.csv(
 "https://docs.dagster.io/assets/iris.csv",
 schema=(
 "sepal_length_cm FLOAT, "
 "sepal_width_cm FLOAT, "
 "petal_length_cm FLOAT, "
 "petal_width_cm FLOAT, "
 "species STRING"
 ),
 )
 defs = Definitions(assets=[iris_dataset], resources=resources)
Resources
- dagster_iceberg.resource.IcebergTableResource ResourceDefinition [source]
- previewThis API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use. Resource for interacting with a PyIceberg table. Example: from dagster import Definitions, asset
 from dagster_iceberg import IcebergTableResource
 @asset
 def my_table(iceberg_table: IcebergTableResource):
 df = iceberg_table.load().to_pandas()
 warehouse_path = "/path/to/warehouse"
 defs = Definitions(
 assets=[my_table],
 resources={
 "iceberg_table": IcebergTableResource(
 name="my_catalog",
 config=IcebergCatalogConfig(
 properties={
 "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
 "warehouse": f"file://{warehouse_path}",
 }
 ),
 table="my_table",
 namespace="my_namespace",
 )
 },
 )
Config
- classdagster_iceberg.config.IcebergCatalogConfig [source]
- previewThis API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use. Configuration for Iceberg Catalogs. See the Catalogs section for configuration options. You can configure the Iceberg IO manager: - Using a .pyiceberg.yamlconfiguration file.
- Through environment variables.
- Using the IcebergCatalogConfigconfiguration object.
 For more information about the first two configuration options, see Setting Configuration Values. Example: from dagster_iceberg.config import IcebergCatalogConfig
 from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager
 warehouse_path = "/path/to/warehouse"
 io_manager = PyArrowIcebergIOManager(
 name="my_catalog",
 config=IcebergCatalogConfig(
 properties={
 "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
 "warehouse": f"file://{warehouse_path}",
 }
 ),
 namespace="my_namespace",
 )
- Using a 
Base Classes
- classdagster_iceberg.io_manager.base.IcebergIOManager [source]
- previewThis API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use. Base class for an I/O manager definition that reads inputs from and writes outputs to Iceberg tables. Examples: import pandas as pd
 import pyarrow as pa
 from dagster import Definitions, asset
 from dagster_iceberg.config import IcebergCatalogConfig
 from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager
 CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
 CATALOG_WAREHOUSE = (
 "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
 )
 resources = {
 "io_manager": PyArrowIcebergIOManager(
 name="test",
 config=IcebergCatalogConfig(
 properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
 ),
 namespace="dagster",
 )
 }
 @asset
 def iris_dataset() -> pa.Table:
 pa.Table.from_pandas(
 pd.read_csv(
 "https://docs.dagster.io/assets/iris.csv",
 names=[
 "sepal_length_cm",
 "sepal_width_cm",
 "petal_length_cm",
 "petal_width_cm",
 "species",
 ],
 )
 )
 defs = Definitions(assets=[iris_dataset], resources=resources)If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist. @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> pa.Table:
 ...To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the InorAssetIn.@asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: pa.Table):
 # my_table will just contain the data from column "a"
 ...
- classdagster_iceberg.handler.IcebergBaseTypeHandler [source]
- previewThis API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use. Base class for a type handler that reads inputs from and writes outputs to Iceberg tables.