Databricks
DataHub supports integration with Databricks ecosystem using a multitude of connectors, depending on your exact setup.
Databricks Hive
The simplest way to integrate is usually via the Hive connector. The Hive starter recipe has a section describing how to connect to your Databricks workspace.
Databricks Unity Catalog (new)
The recently introduced Unity Catalog provides a new way to govern your assets within the Databricks lakehouse. If you have enabled Unity Catalog, you can use the unity-catalog
source (see below) to integrate your metadata into DataHub as an alternate to the Hive pathway.
Databricks Spark
To complete the picture, we recommend adding push-based ingestion from your Spark jobs to see real-time activity and lineage between your Databricks tables and your Spark jobs. Use the Spark agent to push metadata to DataHub using the instructions here.
Watch the DataHub Talk at the Data and AI Summit 2022
For a deeper look at how to think about DataHub within and across your Databricks ecosystem, watch the recording of our talk at the Data and AI Summit 2022.
Important Capabilities
Capability | Status | Notes |
---|---|---|
Asset Containers | ✅ | Enabled by default |
Column-level Lineage | ✅ | Enabled by default |
Dataset Usage | ✅ | Enabled by default |
Descriptions | ✅ | Enabled by default |
Detect Deleted Entities | ✅ | Optionally enabled via stateful_ingestion.remove_stale_metadata |
Domains | ✅ | Supported via the domain config field |
Extract Ownership | ✅ | Supported via the include_ownership config |
Platform Instance | ✅ | Enabled by default |
Schema Metadata | ✅ | Enabled by default |
Table-Level Lineage | ✅ | Enabled by default |
This plugin extracts the following metadata from Databricks Unity Catalog:
- metastores
- schemas
- tables and column lineage
Prerequisities
- Get your Databricks instance's workspace url
- Create a Databricks Service Principal
- You can skip this step and use your own account to get things running quickly, but we strongly recommend creating a dedicated service principal for production use.
- Generate a Databricks Personal Access token following the following guides:
- Provision your service account:
- To ingest your workspace's metadata and lineage, your service principal must have all of the following:
- One of: metastore admin role, ownership of, or
USE CATALOG
privilege on any catalogs you want to ingest - One of: metastore admin role, ownership of, or
USE SCHEMA
privilege on any schemas you want to ingest - Ownership of or
SELECT
privilege on any tables and views you want to ingest - Ownership documentation
- Privileges documentation
- One of: metastore admin role, ownership of, or
- To
include_usage_statistics
(enabled by default), your service principal must haveCAN_MANAGE
permissions on any SQL Warehouses you want to ingest: guide. - To ingest
profiling
information withcall_analyze
(enabled by default), your service principal must have ownership orMODIFY
privilege on any tables you want to profile.- Alternatively, you can run ANALYZE TABLE yourself on any tables you want to profile, then set
call_analyze
tofalse
. You will still needSELECT
privilege on those tables to fetch the results.
- Alternatively, you can run ANALYZE TABLE yourself on any tables you want to profile, then set
- To ingest your workspace's metadata and lineage, your service principal must have all of the following:
- Check the starter recipe below and replace
workspace_url
andtoken
with your information from the previous steps.
CLI based Ingestion
Install the Plugin
pip install 'acryl-datahub[unity-catalog]'
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: unity-catalog
config:
workspace_url: https://my-workspace.cloud.databricks.com
token: "mygenerated_databricks_token"
#metastore_id_pattern:
# deny:
# - 11111-2222-33333-44-555555
#catalog_pattern:
# allow:
# - my-catalog
#schema_pattern:
# deny:
# - information_schema
#table_pattern:
# allow:
# - test.lineagedemo.dinner
# First you have to create domains on Datahub by following this guide -> https://datahubproject.io/docs/domains/#domains-setup-prerequisites-and-permissions
#domain:
# urn:li:domain:1111-222-333-444-555:
# allow:
# - main.*
stateful_ingestion:
enabled: true
pipeline_name: acme-corp-unity
# sink configs if needed
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
token ✅ string | Databricks personal access token |
workspace_url ✅ string | Databricks workspace url. e.g. https://my-workspace.cloud.databricks.com |
bucket_duration Enum | Size of the time window to aggregate usage stats. Default: DAY |
end_time string(date-time) | Latest date of usage to consider. Default: Current time in UTC |
format_sql_queries boolean | Whether to format sql queries Default: False |
include_column_lineage boolean | Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. Default: True |
include_operational_stats boolean | Whether to display operational stats. Default: True |
include_ownership boolean | Option to enable/disable ownership generation for metastores, catalogs, schemas, and tables. Default: False |
include_read_operational_stats boolean | Whether to report read operational stats. Experimental. Default: False |
include_table_lineage boolean | Option to enable/disable lineage generation. Default: True |
include_top_n_queries boolean | Whether to ingest the top_n_queries. Default: True |
include_usage_statistics boolean | Generate usage statistics. Default: True |
platform_instance string | The instance of the platform that all assets produced by this recipe belong to |
start_time string(date-time) | Earliest date of usage to consider. Default: Last full day in UTC (or hour, depending on bucket_duration ) |
store_last_profiling_timestamps boolean | Enable storing last profile timestamp in store. Default: False |
top_n_queries integer | Number of top queries to save to each table. Default: 10 |
workspace_name string | Name of the workspace. Default to deployment name present in workspace_url |
env string | The environment that all assets produced by this connector belong to Default: PROD |
catalog_pattern AllowDenyPattern | Regex patterns for catalogs to filter in ingestion. Specify regex to match the full metastore.catalog name. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
catalog_pattern.allow array(string) | |
catalog_pattern.deny array(string) | |
catalog_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
domain map(str,AllowDenyPattern) | A class to store allow deny regexes |
domain. key .allowarray(string) | |
domain. key .denyarray(string) | |
domain. key .ignoreCaseboolean | Whether to ignore case sensitivity during pattern matching. Default: True |
schema_pattern AllowDenyPattern | Regex patterns for schemas to filter in ingestion. Specify regex to the full metastore.catalog.schema name. e.g. to match all tables in schema analytics, use the regex ^mymetastore\.mycatalog\.analytics$ . Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
schema_pattern.allow array(string) | |
schema_pattern.deny array(string) | |
schema_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
table_pattern AllowDenyPattern | Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in catalog.schema.table format. e.g. to match all tables starting with customer in Customer catalog and public schema, use the regex Customer\.public\.customer.* . Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
table_pattern.allow array(string) | |
table_pattern.deny array(string) | |
table_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
user_email_pattern AllowDenyPattern | regex patterns for user emails to filter in usage. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
user_email_pattern.allow array(string) | |
user_email_pattern.deny array(string) | |
user_email_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
profiling UnityCatalogProfilerConfig | Data profiling configuration Default: {'enabled': False, 'warehouse_id': None, 'profile_... |
profiling.call_analyze boolean | Whether to call ANALYZE TABLE as part of profile ingestion.If false, will ingest the results of the most recent ANALYZE TABLE call, if any. Default: True |
profiling.enabled boolean | Whether profiling should be done. Default: False |
profiling.max_wait_secs integer | Maximum time to wait for an ANALYZE TABLE query to complete. Default: 3600 |
profiling.max_workers integer | Number of worker threads to use for profiling. Set to 1 to disable. Default: 20 |
profiling.profile_table_level_only boolean | Whether to perform profiling at table-level only or include column-level profiling as well. Default: False |
profiling.warehouse_id string | SQL Warehouse id, for running profiling queries. |
profiling.pattern AllowDenyPattern | Regex patterns to filter tables for profiling during ingestion. Specify regex to match the catalog.schema.table format. Note that only tables allowed by the table_pattern will be considered. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
profiling.pattern.allow array(string) | |
profiling.pattern.deny array(string) | |
profiling.pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
stateful_ingestion StatefulStaleMetadataRemovalConfig | Unity Catalog Stateful Ingestion Config. |
stateful_ingestion.enabled boolean | The type of the ingestion state provider registered with datahub. Default: False |
stateful_ingestion.ignore_new_state boolean | If set to True, ignores the current checkpoint state. Default: False |
stateful_ingestion.ignore_old_state boolean | If set to True, ignores the previous checkpoint state. Default: False |
stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True |
The JSONSchema for this configuration is inlined below.
{
"title": "UnityCatalogSourceConfig",
"description": "Base configuration class for stateful ingestion for source configs to inherit from.",
"type": "object",
"properties": {
"store_last_profiling_timestamps": {
"title": "Store Last Profiling Timestamps",
"description": "Enable storing last profile timestamp in store.",
"default": false,
"type": "boolean"
},
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to",
"type": "string"
},
"bucket_duration": {
"description": "Size of the time window to aggregate usage stats.",
"default": "DAY",
"allOf": [
{
"$ref": "#/definitions/BucketDuration"
}
]
},
"end_time": {
"title": "End Time",
"description": "Latest date of usage to consider. Default: Current time in UTC",
"type": "string",
"format": "date-time"
},
"start_time": {
"title": "Start Time",
"description": "Earliest date of usage to consider. Default: Last full day in UTC (or hour, depending on `bucket_duration`)",
"type": "string",
"format": "date-time"
},
"top_n_queries": {
"title": "Top N Queries",
"description": "Number of top queries to save to each table.",
"default": 10,
"exclusiveMinimum": 0,
"type": "integer"
},
"user_email_pattern": {
"title": "User Email Pattern",
"description": "regex patterns for user emails to filter in usage.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"include_operational_stats": {
"title": "Include Operational Stats",
"description": "Whether to display operational stats.",
"default": true,
"type": "boolean"
},
"include_read_operational_stats": {
"title": "Include Read Operational Stats",
"description": "Whether to report read operational stats. Experimental.",
"default": false,
"type": "boolean"
},
"format_sql_queries": {
"title": "Format Sql Queries",
"description": "Whether to format sql queries",
"default": false,
"type": "boolean"
},
"include_top_n_queries": {
"title": "Include Top N Queries",
"description": "Whether to ingest the top_n_queries.",
"default": true,
"type": "boolean"
},
"stateful_ingestion": {
"title": "Stateful Ingestion",
"description": "Unity Catalog Stateful Ingestion Config.",
"allOf": [
{
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
}
]
},
"token": {
"title": "Token",
"description": "Databricks personal access token",
"type": "string"
},
"workspace_url": {
"title": "Workspace Url",
"description": "Databricks workspace url. e.g. https://my-workspace.cloud.databricks.com",
"type": "string"
},
"workspace_name": {
"title": "Workspace Name",
"description": "Name of the workspace. Default to deployment name present in workspace_url",
"type": "string"
},
"catalog_pattern": {
"title": "Catalog Pattern",
"description": "Regex patterns for catalogs to filter in ingestion. Specify regex to match the full `metastore.catalog` name.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"schema_pattern": {
"title": "Schema Pattern",
"description": "Regex patterns for schemas to filter in ingestion. Specify regex to the full `metastore.catalog.schema` name. e.g. to match all tables in schema analytics, use the regex `^mymetastore\\.mycatalog\\.analytics$`.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"table_pattern": {
"title": "Table Pattern",
"description": "Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in `catalog.schema.table` format. e.g. to match all tables starting with customer in Customer catalog and public schema, use the regex `Customer\\.public\\.customer.*`.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"domain": {
"title": "Domain",
"description": "Attach domains to catalogs, schemas or tables during ingestion using regex patterns. Domain key can be a guid like *urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba* or a string like \"Marketing\".) If you provide strings, then datahub will attempt to resolve this name to a guid, and will error out if this fails. There can be multiple domain keys specified.",
"default": {},
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/AllowDenyPattern"
}
},
"include_table_lineage": {
"title": "Include Table Lineage",
"description": "Option to enable/disable lineage generation.",
"default": true,
"type": "boolean"
},
"include_ownership": {
"title": "Include Ownership",
"description": "Option to enable/disable ownership generation for metastores, catalogs, schemas, and tables.",
"default": false,
"type": "boolean"
},
"include_column_lineage": {
"title": "Include Column Lineage",
"description": "Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. ",
"default": true,
"type": "boolean"
},
"include_usage_statistics": {
"title": "Include Usage Statistics",
"description": "Generate usage statistics.",
"default": true,
"type": "boolean"
},
"profiling": {
"title": "Profiling",
"description": "Data profiling configuration",
"default": {
"enabled": false,
"warehouse_id": null,
"profile_table_level_only": false,
"pattern": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"call_analyze": true,
"max_wait_secs": 3600,
"max_workers": 20
},
"allOf": [
{
"$ref": "#/definitions/UnityCatalogProfilerConfig"
}
]
}
},
"required": [
"token",
"workspace_url"
],
"additionalProperties": false,
"definitions": {
"BucketDuration": {
"title": "BucketDuration",
"description": "An enumeration.",
"enum": [
"DAY",
"HOUR"
],
"type": "string"
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19)."
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "The type of the ingestion state provider registered with datahub.",
"default": false,
"type": "boolean"
},
"ignore_old_state": {
"title": "Ignore Old State",
"description": "If set to True, ignores the previous checkpoint state.",
"default": false,
"type": "boolean"
},
"ignore_new_state": {
"title": "Ignore New State",
"description": "If set to True, ignores the current checkpoint state.",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"UnityCatalogProfilerConfig": {
"title": "UnityCatalogProfilerConfig",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether profiling should be done.",
"default": false,
"type": "boolean"
},
"warehouse_id": {
"title": "Warehouse Id",
"description": "SQL Warehouse id, for running profiling queries.",
"type": "string"
},
"profile_table_level_only": {
"title": "Profile Table Level Only",
"description": "Whether to perform profiling at table-level only or include column-level profiling as well.",
"default": false,
"type": "boolean"
},
"pattern": {
"title": "Pattern",
"description": "Regex patterns to filter tables for profiling during ingestion. Specify regex to match the `catalog.schema.table` format. Note that only tables allowed by the `table_pattern` will be considered.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"call_analyze": {
"title": "Call Analyze",
"description": "Whether to call ANALYZE TABLE as part of profile ingestion.If false, will ingest the results of the most recent ANALYZE TABLE call, if any.",
"default": true,
"type": "boolean"
},
"max_wait_secs": {
"title": "Max Wait Secs",
"description": "Maximum time to wait for an ANALYZE TABLE query to complete.",
"default": 3600,
"type": "integer"
},
"max_workers": {
"title": "Max Workers",
"description": "Number of worker threads to use for profiling. Set to 1 to disable.",
"default": 20,
"type": "integer"
}
},
"additionalProperties": false
}
}
}
Troubleshooting
No data lineage captured or missing lineage
Check that you meet the Unity Catalog lineage requirements.
Also check the Unity Catalog limitations to make sure that lineage would be expected to exist in this case.
Lineage extraction is too slow
Currently, there is no way to get table or column lineage in bulk from the Databricks Unity Catalog REST api. Table lineage calls require one API call per table, and column lineage calls require one API call per column. If you find metadata extraction taking too long, you can turn off column level lineage extraction via the include_column_lineage
config flag.
Code Coordinates
- Class Name:
datahub.ingestion.source.unity.source.UnityCatalogSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Databricks, feel free to ping us on our Slack.