Skip to content

Commit

Permalink
Move all k8S classes to cncf.kubernetes provider (#32767)
Browse files Browse the repository at this point in the history
* Move all k8S classes to cncf.kubernetes provider

This is the big move of all Kubenetes classes to go to provider.

The changes that are implemented in this move:

* replaced all imports from airflow.kubernetes to cncf.kubernetes
  Swith PEP-563 dynamic import rediretion and deprecation messages
  those messages now support overriding the "replacement" hints
  to make K8s deprecations more accurate
* pre_7_4_0_compatibility package with classes used by past
  providerrs have been "frozen" and stored in the package with
  import redirections from airflow.kubernetes(with deprecation warnings)
* kubernetes configuration is moved to kubernetes provider
* mypy started complaining about conf and set used in configuration.
  so better solution to handle deprecations and hinting conf
  returning AirlfowConfigParsing was added.
* example_kuberntes_executor uses configuration reading not in
  top level but in execute method
* PodMutationHookException and PodReconciliationError have
  been moved to cncf.kubernetes provider and they are imported
  from there with fallback to an airflow.exception ones in case
  old provider is used in Airflow 2.7.0
* k8s methods in task_instance have been deprecated and reolaced
  with functions in "cncf.kubernetes` template_rendering module
  the old way still works but raise deprecaton warnings.
* added extras with versions for celery and k8s
* raise AirflowOptionalProviderFeatureException in case there is
  attempt to use CeleryK8sExecutor and cncf.k8s is not installed.
* added few "new" core utils to k8s (hashlib_wrapper etc)
* both warnings and errors indicate minimum versions for both cncf.k8s
  and Celery providers.

* Update newsfragments/32767.significant.rst

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

---------

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
  • Loading branch information
potiuk and jedcunningham committed Jul 26, 2023
1 parent 8809478 commit e934603
Show file tree
Hide file tree
Showing 125 changed files with 2,960 additions and 800 deletions.
1 change: 0 additions & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

# Kubernetes
/airflow/kubernetes/ @dstandish @jedcunningham
/airflow/kubernetes_executor_templates/ @dstandish @jedcunningham
/airflow/executors/celery_kubernetes_executor.py @dstandish @jedcunningham
/airflow/executors/kubernetes_executor.py @dstandish @jedcunningham

Expand Down
6 changes: 0 additions & 6 deletions .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,13 @@ labelPRBasedOnFilePath:
- airflow/**/kubernetes_*.py
- airflow/example_dags/example_kubernetes_executor.py
- airflow/providers/cncf/kubernetes/**/*
- airflow/kubernetes/**/*
- airflow/kubernetes_executor_templates/**/*
- airflow/executors/kubernetes_executor.py
- airflow/providers/celery/executors/celery_kubernetes_executor.py
- docs/apache-airflow/core-concepts/executor/kubernetes.rst
- docs/apache-airflow/core-concepts/executor/celery_kubernetes.rst
- docs/apache-airflow-providers-cncf-kubernetes/**/*
- kubernetes_tests/**/*
- tests/providers/cncf/kubernetes/**/*
- tests/system/providers/cncf/kubernetes/**/*
- tests/kubernetes/**/*
- tests/executors/kubernetes_executor_template_files/**/*
- tests/executors/*kubernetes*.py

area:API:
- airflow/api/**/*
Expand Down
16 changes: 16 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,22 @@ repos:
pass_filenames: false
entry: ./scripts/ci/pre_commit/pre_commit_check_order_setup.py
additional_dependencies: ['rich>=12.4.4']
- id: check-airflow-k8s-not-used
name: Check airflow.kubernetes imports are not used
language: python
files: ^airflow/.*\.py$
require_serial: true
exclude: ^airflow/kubernetes/
entry: ./scripts/ci/pre_commit/pre_commit_check_airflow_k8s_not_used.py
additional_dependencies: ['rich>=12.4.4']
- id: check-cncf-k8s-only-for-executors
name: Check cncf.kubernetes imports used for executors only
language: python
files: ^airflow/.*\.py$
require_serial: true
exclude: ^airflow/kubernetes/|^airflow/providers/
entry: ./scripts/ci/pre_commit/pre_commit_check_cncf_k8s_used_for_k8s_executor_only.py
additional_dependencies: ['rich>=12.4.4']
- id: check-extra-packages-references
name: Checks setup extra packages
description: Checks if all the libraries in setup.py are listed in extra-packages-ref.rst file
Expand Down
4 changes: 4 additions & 0 deletions STATIC_CODE_CHECKS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ require Breeze Docker image to be built locally.
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-aiobotocore-optional | Check if aiobotocore is an optional dependency only | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-airflow-k8s-not-used | Check airflow.kubernetes imports are not used | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-airflow-provider-compatibility | Check compatibility of Providers with Airflow | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-apache-license-rat | Check if licenses are OK for Apache | |
Expand All @@ -163,6 +165,8 @@ require Breeze Docker image to be built locally.
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-changelog-has-no-duplicates | Check changelogs for duplicate entries | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-cncf-k8s-only-for-executors | Check cncf.kubernetes imports used for executors only | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-core-deprecation-classes | Verify usage of Airflow deprecation classes in core | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-daysago-import-from-utils | Make sure days_ago is imported from airflow.utils.dates | |
Expand Down
10 changes: 5 additions & 5 deletions airflow/cli/commands/kubernetes_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException

from airflow.executors.kubernetes_executor import KubeConfig
from airflow.kubernetes import pod_generator
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.kubernetes_helper_functions import create_pod_id
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.models import DagRun, TaskInstance
from airflow.providers.cncf.kubernetes import pod_generator
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubeConfig
from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_pod_id
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.utils import cli as cli_utils, yaml
from airflow.utils.cli import get_dag
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@
},
}

add_deprecated_classes(__deprecated_classes, __name__)
add_deprecated_classes(__deprecated_classes, __name__, {}, "The `celery` provider must be >= 3.3.0 for that.")
210 changes: 0 additions & 210 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1943,21 +1943,6 @@ sentry:
type: string
example: ~
default: ~
local_kubernetes_executor:
description: |
This section only applies if you are using the ``LocalKubernetesExecutor`` in
``[core]`` section above
options:
kubernetes_queue:
description: |
Define when to send a task to ``KubernetesExecutor`` when using ``LocalKubernetesExecutor``.
When the queue of a task is the value of ``kubernetes_queue`` (default ``kubernetes``),
the task is executed via ``KubernetesExecutor``,
otherwise via ``LocalExecutor``
version_added: 2.3.0
type: string
example: ~
default: "kubernetes"
scheduler:
description: ~
options:
Expand Down Expand Up @@ -2442,201 +2427,6 @@ elasticsearch_configs:
type: string
example: ~
default: "True"
kubernetes_executor:
description: ~
renamed:
previous_name: kubernetes
version: 2.5.0
options:
api_client_retry_configuration:
description: |
Kwargs to override the default urllib3 Retry used in the kubernetes API client
version_added: 2.6.0
type: string
example: '{ "total": 3, "backoff_factor": 0.5 }'
default: ""
logs_task_metadata:
description: |
Flag to control the information added to kubernetes executor logs for better traceability
version_added: 2.7.0
type: boolean
example: ~
default: "False"
pod_template_file:
description: |
Path to the YAML pod file that forms the basis for KubernetesExecutor workers.
version_added: 1.10.11
type: string
example: ~
default: ""
see_also: ":ref:`concepts:pod_template_file`"
worker_container_repository:
description: |
The repository of the Kubernetes Image for the Worker to Run
version_added: ~
type: string
example: ~
default: ""
worker_container_tag:
description: |
The tag of the Kubernetes Image for the Worker to Run
version_added: ~
type: string
example: ~
default: ""
namespace:
description: |
The Kubernetes namespace where airflow workers should be created. Defaults to ``default``
version_added: ~
type: string
example: ~
default: "default"
delete_worker_pods:
description: |
If True, all worker pods will be deleted upon termination
version_added: ~
type: string
example: ~
default: "True"
delete_worker_pods_on_failure:
description: |
If False (and delete_worker_pods is True),
failed worker pods will not be deleted so users can investigate them.
This only prevents removal of worker pods where the worker itself failed,
not when the task it ran failed.
version_added: 1.10.11
type: string
example: ~
default: "False"
worker_pods_creation_batch_size:
description: |
Number of Kubernetes Worker Pod creation calls per scheduler loop.
Note that the current default of "1" will only launch a single pod
per-heartbeat. It is HIGHLY recommended that users increase this
number to match the tolerance of their kubernetes cluster for
better performance.
version_added: 1.10.3
type: string
example: ~
default: "1"
multi_namespace_mode:
description: |
Allows users to launch pods in multiple namespaces.
Will require creating a cluster-role for the scheduler,
or use multi_namespace_mode_namespace_list configuration.
version_added: 1.10.12
type: boolean
example: ~
default: "False"
multi_namespace_mode_namespace_list:
description: |
If multi_namespace_mode is True while scheduler does not have a cluster-role,
give the list of namespaces where the scheduler will schedule jobs
Scheduler needs to have the necessary permissions in these namespaces.
version_added: 2.6.0
type: string
example: ~
default: ""
in_cluster:
description: |
Use the service account kubernetes gives to pods to connect to kubernetes cluster.
It's intended for clients that expect to be running inside a pod running on kubernetes.
It will raise an exception if called from a process not running in a kubernetes environment.
version_added: ~
type: string
example: ~
default: "True"
cluster_context:
description: |
When running with in_cluster=False change the default cluster_context or config_file
options to Kubernetes client. Leave blank these to use default behaviour like ``kubectl`` has.
version_added: 1.10.3
type: string
example: ~
default: ~
config_file:
description: |
Path to the kubernetes configfile to be used when ``in_cluster`` is set to False
version_added: 1.10.3
type: string
example: ~
default: ~
kube_client_request_args:
description: |
Keyword parameters to pass while calling a kubernetes client core_v1_api methods
from Kubernetes Executor provided as a single line formatted JSON dictionary string.
List of supported params are similar for all core_v1_apis, hence a single config
variable for all apis. See:
https://raw.githubusercontent.com/kubernetes-client/python/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/api/core_v1_api.py
version_added: 1.10.4
type: string
example: ~
default: ""
delete_option_kwargs:
description: |
Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client
``core_v1_api`` method when using the Kubernetes Executor.
This should be an object and can contain any of the options listed in the ``v1DeleteOptions``
class defined here:
https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19
version_added: 1.10.12
type: string
example: '{"grace_period_seconds": 10}'
default: ""
enable_tcp_keepalive:
description: |
Enables TCP keepalive mechanism. This prevents Kubernetes API requests to hang indefinitely
when idle connection is time-outed on services like cloud load balancers or firewalls.
version_added: 2.0.0
type: boolean
example: ~
default: "True"
tcp_keep_idle:
description: |
When the `enable_tcp_keepalive` option is enabled, TCP probes a connection that has
been idle for `tcp_keep_idle` seconds.
version_added: 2.0.0
type: integer
example: ~
default: "120"
tcp_keep_intvl:
description: |
When the `enable_tcp_keepalive` option is enabled, if Kubernetes API does not respond
to a keepalive probe, TCP retransmits the probe after `tcp_keep_intvl` seconds.
version_added: 2.0.0
type: integer
example: ~
default: "30"
tcp_keep_cnt:
description: |
When the `enable_tcp_keepalive` option is enabled, if Kubernetes API does not respond
to a keepalive probe, TCP retransmits the probe `tcp_keep_cnt number` of times before
a connection is considered to be broken.
version_added: 2.0.0
type: integer
example: ~
default: "6"
verify_ssl:
description: |
Set this to false to skip verifying SSL certificate of Kubernetes python client.
version_added: 2.1.0
type: boolean
example: ~
default: "True"
worker_pods_queued_check_interval:
description: |
How often in seconds to check for task instances stuck in "queued" status without a pod
version_added: 2.2.0
type: integer
example: ~
default: "60"
ssl_ca_cert:
description: |
Path to a CA certificate to be used by the Kubernetes client to verify the server's SSL certificate.
version_added: 2.6.0
type: string
example: ~
default: ""
sensors:
description: ~
options:
Expand Down
9 changes: 5 additions & 4 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from airflow.exceptions import AirflowConfigException
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH, BaseSecretsBackend
from airflow.utils import yaml
from airflow.utils.empty_set import _get_empty_set_for_configuration
from airflow.utils.module_loading import import_string
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.weight_rule import WeightRule
Expand All @@ -66,8 +67,6 @@

ENV_VAR_PREFIX = "AIRFLOW__"

EMPTY_SET: Set[tuple[str, str]] = set() # noqa: UP006


def _parse_sqlite_version(s: str) -> tuple[int, ...]:
match = _SQLITE3_VERSION_PATTERN.match(s)
Expand Down Expand Up @@ -299,7 +298,9 @@ def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) -> Any:
@functools.cached_property
def sensitive_config_values(self) -> Set[tuple[str, str]]: # noqa: UP006
if self.configuration_description is None:
return EMPTY_SET.copy() # we can't use set() here because set is defined below # ¯\_(ツ)_/¯
return (
_get_empty_set_for_configuration()
) # we can't use set() here because set is defined below # ¯\_(ツ)_/¯
flattened = {
(s, k): item
for s, s_c in self.configuration_description.items()
Expand Down Expand Up @@ -2313,6 +2314,6 @@ def initialize_auth_manager() -> BaseAuthManager:
FERNET_KEY = "" # Set only if needed when generating a new file
WEBSERVER_CONFIG = "" # Set by initialize_config

conf = initialize_config()
conf: AirflowConfigParser = initialize_config()
secrets_backend_list = initialize_secrets_backends()
conf.validate()
2 changes: 1 addition & 1 deletion airflow/decorators/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ from airflow.decorators.python_virtualenv import virtualenv_task
from airflow.decorators.sensor import sensor_task
from airflow.decorators.short_circuit import short_circuit_task
from airflow.decorators.task_group import task_group
from airflow.kubernetes.secret import Secret
from airflow.models.dag import dag
from airflow.providers.cncf.kubernetes.secret import Secret

# Please keep this in sync with __init__.py's __all__.
__all__ = [
Expand Down
5 changes: 2 additions & 3 deletions airflow/example_dags/example_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@

log = logging.getLogger(__name__)

worker_container_repository = conf.get("kubernetes_executor", "worker_container_repository")
worker_container_tag = conf.get("kubernetes_executor", "worker_container_tag")

try:
from kubernetes.client import models as k8s
except ImportError:
Expand Down Expand Up @@ -163,6 +160,8 @@ def other_namespace_task():
print_stuff()

other_ns_task = other_namespace_task()
worker_container_repository = conf.get("kubernetes_executor", "worker_container_repository")
worker_container_tag = conf.get("kubernetes_executor", "worker_container_tag")

# You can also change the base image, here we used the worker image for demonstration.
# Note that the image must have the same configuration as the
Expand Down
Loading

0 comments on commit e934603

Please sign in to comment.