Skip to main content

But First, Semantics: Upsert versus Patch

Why Would You Use Patch

By default, most of the SDK tutorials and API-s involve applying full upserts at the aspect level. This means that typically, when you want to change one field within an aspect without modifying others, you need to do a read-modify-write to not overwrite existing fields. To support these scenarios, DataHub supports PATCH based operations so that targeted changes to single fields or values within arrays of fields are possible without impacting other existing metadata.

note

Currently, PATCH support is only available for a selected set of aspects, so before pinning your hopes on using PATCH as a way to make modifications to aspect values, confirm whether your aspect supports PATCH semantics. The complete list of Aspects that are supported are maintained here. In the near future, we do have plans to automatically support PATCH semantics for aspects by default.

How To Use Patch

Examples for using Patch are sprinkled throughout the API guides. Here's how to find the appropriate classes for the language for your choice.

The Python Patch builders are entity-oriented and located in the metadata-ingestion module and located in the datahub.specific module.

Here are a few illustrative examples using the Python Patch builders:

Add Properties to Dataset

# Inlined from /metadata-ingestion/examples/library/dataset_add_properties.py
import logging
from typing import Union

from datahub.configuration.kafka import KafkaProducerConnectionConfig
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.specific.dataset import DatasetPatchBuilder

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


# Get an emitter, either REST or Kafka, this example shows you both
def get_emitter() -> Union[DataHubRestEmitter, DatahubKafkaEmitter]:
USE_REST_EMITTER = True
if USE_REST_EMITTER:
gms_endpoint = "http://localhost:8080"
return DataHubRestEmitter(gms_server=gms_endpoint)
else:
kafka_server = "localhost:9092"
schema_registry_url = "http://localhost:8081"
return DatahubKafkaEmitter(
config=KafkaEmitterConfig(
connection=KafkaProducerConnectionConfig(
bootstrap=kafka_server, schema_registry_url=schema_registry_url
)
)
)


dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")

with get_emitter() as emitter:
for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.add_custom_property("cluster_name", "datahubproject.acryl.io")
.add_custom_property("retention_time", "2 years")
.build()
):
emitter.emit(patch_mcp)


log.info(f"Added cluster_name, retention_time properties to dataset {dataset_urn}")