Cluster
This module contains functions to working with databricks clusters.
cluster_name_exists
cluster_name_exists(name: str)-> bool
Parameter | Description |
---|---|
name |
Name of the cluster. Note that more than one cluster with the same name can exist within a given databricks workspace since they are uniquely identified by the cluster_id and not their name. |
A predicate function that can be used to check if a cluster with a given name already exists.
cluster_get_by_name
cluster_get_by_name(name: str)-> list
Parameter | Description |
---|---|
name |
Name of the cluster. Note that more than one cluster with the same name can exist within a given databricks workspace since they are uniquely identified by the cluster_id and not their name. |
Returns a list of cluster details that have a matching name. Note that more than one cluster with the same name can exist with a given databricks workspace since they are uniquely identified by the cluster_id and not their name. The function response therefore returns a list of cluster objects. The response dictionary for a cluster object can be seen in the API documentation
cluster_action
cluster_action(
cluster_id: str,
cluster_action: ClusterAction
) -> dict
Executes a cluster action for given cluster using the cluster_id
. The ClusterAction
can be imported from the cluster
module. The function is an asynchonous call and will not wait for the operation to complete. Note that a cluster must be unpinned before it can be deleted.
Note that there are higher level functions in this module for waiting for running and waiting for clusters states, please see:
clusters_create
cluster_wait_until_state
Cluster actions are fairly self explanatory. Pinned clusters will remain in the workspace UI list. Unpinned clusters are deleted after 30 days.
from cluster import ClusterAction
for a in ClusterAction:
print(a)
>>> ClusterAction.PIN
>>> ClusterAction.UNPIN
>>> ClusterAction.START
>>> ClusterAction.RESTART
>>> ClusterAction.STOP
>>> ClusterAction.DELETE
clusters_create
Creates clusters defined in YAML files in the cluster_defn_folder
. The cluster definition structure follows the json schema defined for the API but is YAML for easier coding. See cluster_create
.
clusters_create(
cluster_defn_folder: str,
pin: bool = True,
stop: bool = True,
delete_if_exists: bool = False,
allow_duplicate_names: bool = False,
init_script_path: str = None,
)
Parameter | Description |
---|---|
cluster_defn_folder |
Path of yaml files that hold cluster definitions. YAML supports variable replacements see example below |
pin |
Whether the cluster should be pinned in the UI |
stop |
By default databricks will start a cluster that is created. Use this parameter to stop the cluster from starting on creation |
delete_if_exists |
If the cluster name already exists then unpin and delete the existing cluster |
allow_duplicate_names |
Clusers are uniquely identified by an id, use this parameter if you wish to enforce unique names. Note that if this is False , the name already exists and delete_if_exists is False then the creation will fail. |
init_script_path |
Path to an init sh script that you may want to deploy with cluster. Ensure that the init_script.dbfs[].destination is defined. |
cluster_create
Creates a cluster defined a YAML file in the cluster_defn_path
. The cluster definition structure follows the json schema defined for the API but is YAML for easier coding. See cluster_create
.
cluster_create(
cluster_defn_path: str,
pin: bool = True,
stop: bool = True,
delete_if_exists: bool = False,
allow_duplicate_names: bool = False,
init_script_path: str = None,
) -> dict
Parameter | Description |
---|---|
cluster_defn_path |
Path of the yaml file that holds the definition. YAML supports variable replacements see example below |
pin |
Whether the cluster should be pinned in the UI |
stop |
By default databricks will start a cluster that is created. Use this parameter to stop the cluster from starting on creation |
delete_if_exists |
If the cluster name already exists then unpin and delete the existing cluster |
allow_duplicate_names |
Clusers are uniquely identified by an id, use this parameter if you wish to enforce unique names. Note that if this is False , the name already exists and delete_if_exists is False then the creation will fail. |
init_script_path |
Path to an init sh script that you may want to deploy with cluster. Ensure that the init_script.dbfs[].destination is defined. |
The cluster definition supports variable injection to help conventional naming. For example a file store at {cluster_defn_path}/my_cluster.yaml
will create a cluster called my_cluster_7.5
with logs and init scripts stored under a leaf folder with that name. Variables are:
Variable | Description |
---|---|
{filename} |
The filename of the cluster definition. Handy to drive the name of the cluster using the definition file name |
{dbr} |
The major and minor value of the DBR runtime |
{cluster_name} |
The name of the cluster, used in the exmaple below to create the logs and reference the init script |
For example:
num_workers: 1
cluster_name: "{filename}_{dbr}"
spark_version: "7.5.x-cpu-ml-scala2.12"
spark_conf":
spark.databricks.cluster.profile: "serverless"
spark.databricks.passthrough.enabled: "true"
spark.databricks.pyspark.enableProcessIsolation: "true"
spark.databricks.repl.allowedLanguages: "python,sql"
azure_attributes:
first_on_demand: 1
availability: ON_DEMAND_AZURE
spot_bid_max_price: -1
node_type_id: "Standard_DS3_v2"
driver_node_type_id: "Standard_DS3_v2"
ssh_public_keys:
custom_tags:
deployed_by: "AzureDevOps"
cluster_log_conf:
dbfs:
destination: "dbfs:/FileStore/cluster/logs/{cluster_name}"
init_scripts:
- dbfs:
destination: "dbfs:/FileStore/cluster/init/{cluster_name}.sh"
spark_env_vars:
ENVIRONMENT: "TEST"
autotermination_minutes: 30
enable_elastic_disk: true
cluster_delete_clusters:
cluster_delete_clusters(clusters: list)
Deletes the clusters in the list. The list would be list of cluster objects. In order for the function to delete a cluster each object must have a cluster_id
attribute; other attributes maybe present. For example the following would be valid
[{"cluster_id":"1"}, {"cluster_id":"2"}, {"cluster_id":"3"}]
cluster_list
cluster_list() -> dict
Returns a list of cluster cluster definitions.
get_cluster_state
get_cluster_state(cluster_id: str) -> ClusterState
Gets the clusters state for a given cluster_id
. Note the self explanatory ClusterState
enum:
ClusterState.PENDING
ClusterState.RUNNING
ClusterState.RESTARTING
ClusterState.RESIZING
ClusterState.TERMINATING
ClusterState.TERMINATED
ClusterState.ERROR
ClusterState.UNKNOWN
cluster_is_running
cluster_is_running(cluster_id: str) -> bool
Returns True
if a cluster with the cluster_id is in the ClusterState.RUNNING
state
cluster_is_terminated
cluster_is_terminated(cluster_id: str) -> bool
Returns True
if a cluster with the cluster_id is in the ClusterState.TERMINATING
state
cluster_get
cluster_get(cluster_id: str) -> dict
Gets a cluster definition for a given cluster_id
cluster_wait_until_state
cluster_wait_until_state(
cluster_id: str,
cluster_state: ClusterState,
wait_seconds: int = 10
)
This is a blocking call that will wait for a given ClusterState
to be reached for a given cluster_id
. wait_seconds
is the interval the blocking call will wait between calls that check if the state has been reached. WARNING: setting wait_seconds
too low may cause the API limits to be exceeded.
Note the self explanatory ClusterState
enum:
ClusterState.PENDING
ClusterState.RUNNING
ClusterState.RESTARTING
ClusterState.RESIZING
ClusterState.TERMINATING
ClusterState.TERMINATED
ClusterState.ERROR
ClusterState.UNKNOWN
cluster_has_tag
cluster_has_tag(
cluster: dict,
tag_key: str,
tag_value: str
) -> bool
Returns True
if a given cluster definition with a cluster_id
has a tag_key
with the value tag_value
clusters_clear_down
clusters_clear_down(
tag_key: str = None,
tag_value: str = None,
show_only: bool = True
)
Deletes all clusters optionally with a certain tag_key
and tag_value
. show_only
= True
allows the caller to output the clusters to check what will be deleted. Setting show_only
= False
will execute the deletion.
cluster_log_states
cluster_log_states()
cluster_run
cluster_run(cluster_id: str)
Runs a cluster with a given cluster_id
and waits for to the reach the running state or to terminate with error. This call will handle the repsonse if the cluster has already been started but isn't running yet. If the cluster has already started when this is called it will return when either the cluster has reach a running state or terminated with error.