[airflow] Extend rule to check class attributes, methods, arguments (AIR302) (#15083)

## Summary

Airflow 3.0 removes various deprecated functions, members, modules, and
other values. They have been deprecated in 2.x, but the removal causes
incompatibilities that we want to detect. This PR add rules for the
following.

* Removed class attribute
* `airflow.providers_manager.ProvidersManager.dataset_factories` →
`airflow.providers_manager.ProvidersManager.asset_factories`
* `airflow.providers_manager.ProvidersManager.dataset_uri_handlers` →
`airflow.providers_manager.ProvidersManager.asset_uri_handlers`
*
`airflow.providers_manager.ProvidersManager.dataset_to_openlineage_converters`
→
`airflow.providers_manager.ProvidersManager.asset_to_openlineage_converters`
* `airflow.lineage.hook.DatasetLineageInfo.dataset` →
`airflow.lineage.hook.AssetLineageInfo.asset`
* Removed class method (subclasses in airflow should also checked)
* `airflow.secrets.base_secrets.BaseSecretsBackend.get_conn_uri` →
`airflow.secrets.base_secrets.BaseSecretsBackend.get_conn_value`
* `airflow.secrets.base_secrets.BaseSecretsBackend.get_connections` →
`airflow.secrets.base_secrets.BaseSecretsBackend.get_connection`
* `airflow.hooks.base.BaseHook.get_connections` → use `get_connection`
* `airflow.datasets.BaseDataset.iter_datasets` →
`airflow.sdk.definitions.asset.BaseAsset.iter_assets`
* `airflow.datasets.BaseDataset.iter_dataset_aliases` →
`airflow.sdk.definitions.asset.BaseAsset.iter_asset_aliases`
* Removed constructor args (subclasses in airflow should also checked)
* argument `filename_template`
in`airflow.utils.log.file_task_handler.FileTaskHandler`
    * in `BaseOperator`
        * `sla`
        * `task_concurrency` → `max_active_tis_per_dag`
    * in `BaseAuthManager`
        * `appbuilder`
* Removed class variable (subclasses anywhere should be checked)
    * in `airflow.plugins_manager.AirflowPlugin`
        * `executors` (from #43289)
        * `hooks`
        * `operators`
        * `sensors`
* Replaced names
	* `airflow.hooks.base_hook.BaseHook` → `airflow.hooks.base.BaseHook`
* `airflow.operators.dagrun_operator.TriggerDagRunLink` →
`airflow.operators.trigger_dagrun.TriggerDagRunLink`
* `airflow.operators.dagrun_operator.TriggerDagRunOperator` →
`airflow.operators.trigger_dagrun.TriggerDagRunOperator`
* `airflow.operators.python_operator.BranchPythonOperator` →
`airflow.operators.python.BranchPythonOperator`
* `airflow.operators.python_operator.PythonOperator` →
`airflow.operators.python.PythonOperator`
* `airflow.operators.python_operator.PythonVirtualenvOperator` →
`airflow.operators.python.PythonVirtualenvOperator`
* `airflow.operators.python_operator.ShortCircuitOperator` →
`airflow.operators.python.ShortCircuitOperator`
* `airflow.operators.latest_only_operator.LatestOnlyOperator` →
`airflow.operators.latest_only.LatestOnlyOperator`


In additional to the changes above, this PR also add utility functions
and improve docstring.


## Test Plan

A test fixture is included in the PR.
This commit is contained in:
Wei Lee 2024-12-31 13:19:18 +09:00 committed by GitHub
parent 2a1aa29366
commit 253c274afa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 1741 additions and 882 deletions

View file

@ -0,0 +1,29 @@
from airflow.plugins_manager import AirflowPlugin
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
# --- Invalid extensions start
operators = [PluginOperator]
sensors = [PluginSensorOperator]
hooks = [PluginHook]
executors = [PluginExecutor]
# --- Invalid extensions end
macros = [plugin_macro]
flask_blueprints = [bp]
appbuilder_views = [v_appbuilder_package]
appbuilder_menu_items = [appbuilder_mitem, appbuilder_mitem_toplevel]
global_operator_extra_links = [
AirflowLink(),
GithubLink(),
]
operator_extra_links = [
GoogleLink(),
AirflowLink2(),
CustomOpLink(),
CustomBaseIndexOpLink(1),
]
timetables = [CustomCronDataIntervalTimetable]
listeners = [empty_listener, ClassBasedListener()]
ti_deps = [CustomTestTriggerRule()]
priority_weight_strategies = [CustomPriorityWeightStrategy]

View file

@ -1,14 +1,17 @@
from datetime import timedelta
from airflow import DAG, dag
from airflow.timetables.simple import NullTimetable
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.operators import trigger_dagrun
from airflow.operators.datetime import BranchDateTimeOperator
from airflow.providers.standard.operators import datetime
from airflow.sensors.weekday import DayOfWeekSensor, BranchDayOfWeekOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.amazon.aws.log.s3_task_handler import S3TaskHandler
from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler
from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager
from airflow.providers.google.cloud.log.gcs_task_handler import GCSTaskHandler
from airflow.providers.standard.operators import datetime, trigger_dagrun
from airflow.providers.standard.sensors import weekday
from airflow.sensors.weekday import BranchDayOfWeekOperator, DayOfWeekSensor
from airflow.timetables.simple import NullTimetable
DAG(dag_id="class_schedule", schedule="@hourly")
@ -54,10 +57,12 @@ def decorator_deprecated_operator_args():
)
branch_dt_op = datetime.BranchDateTimeOperator(
task_id="branch_dt_op", use_task_execution_day=True
task_id="branch_dt_op", use_task_execution_day=True, task_concurrency=5
)
branch_dt_op2 = BranchDateTimeOperator(
task_id="branch_dt_op2", use_task_execution_day=True
task_id="branch_dt_op2",
use_task_execution_day=True,
sla=timedelta(seconds=10),
)
dof_task_sensor = weekday.DayOfWeekSensor(
@ -76,3 +81,12 @@ def decorator_deprecated_operator_args():
branch_dt_op >> branch_dt_op2
dof_task_sensor >> dof_task_sensor2
bdow_op >> bdow_op2
# deprecated filename_template arugment in FileTaskHandler
S3TaskHandler(filename_template="/tmp/test")
HdfsTaskHandler(filename_template="/tmp/test")
ElasticsearchTaskHandler(filename_template="/tmp/test")
GCSTaskHandler(filename_template="/tmp/test")
FabAuthManager(None)

View file

@ -0,0 +1,59 @@
from airflow.datasets.manager import DatasetManager
from airflow.lineage.hook import DatasetLineageInfo, HookLineageCollector
from airflow.providers.amazon.auth_manager.aws_auth_manager import AwsAuthManager
from airflow.providers.apache.beam.hooks import BeamHook, NotAir302HookError
from airflow.providers.google.cloud.secrets.secret_manager import (
CloudSecretManagerBackend,
)
from airflow.providers.hashicorp.secrets.vault import NotAir302SecretError, VaultBackend
from airflow.providers_manager import ProvidersManager
from airflow.secrets.base_secrets import BaseSecretsBackend
dm = DatasetManager()
dm.register_dataset_change()
dm.create_datasets()
dm.notify_dataset_created()
dm.notify_dataset_changed()
dm.notify_dataset_alias_created()
hlc = HookLineageCollector()
hlc.create_dataset()
hlc.add_input_dataset()
hlc.add_output_dataset()
hlc.collected_datasets()
aam = AwsAuthManager()
aam.is_authorized_dataset()
pm = ProvidersManager()
pm.initialize_providers_asset_uri_resources()
pm.dataset_factories
base_secret_backend = BaseSecretsBackend()
base_secret_backend.get_conn_uri()
base_secret_backend.get_connections()
csm_backend = CloudSecretManagerBackend()
csm_backend.get_conn_uri()
csm_backend.get_connections()
vault_backend = VaultBackend()
vault_backend.get_conn_uri()
vault_backend.get_connections()
not_an_error = NotAir302SecretError()
not_an_error.get_conn_uri()
beam_hook = BeamHook()
beam_hook.get_conn_uri()
not_an_error = NotAir302HookError()
not_an_error.get_conn_uri()
provider_manager = ProvidersManager()
provider_manager.dataset_factories
provider_manager.dataset_uri_handlers
provider_manager.dataset_to_openlineage_converters
dl_info = DatasetLineageInfo()
dl_info.dataset

View file

@ -30,20 +30,29 @@ from airflow.datasets import (
DatasetAny,
expand_alias_to_datasets,
)
from airflow.datasets.metadata import Metadata
from airflow.datasets.manager import (
DatasetManager,
dataset_manager,
resolve_dataset_manager,
)
from airflow.datasets.metadata import Metadata
from airflow.hooks.base_hook import BaseHook
from airflow.lineage.hook import DatasetLineageInfo
from airflow.listeners.spec.dataset import on_dataset_changed, on_dataset_created
from airflow.metrics.validators import AllowListValidator, BlockListValidator
from airflow.operators import dummy_operator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.branch_operator import BaseBranchOperator
from airflow.operators.dagrun_operator import TriggerDagRunLink, TriggerDagRunOperator
from airflow.operators.dummy import DummyOperator, EmptyOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.operators.python_operator import (
BranchPythonOperator,
PythonOperator,
PythonVirtualenvOperator,
ShortCircuitOperator,
)
from airflow.operators.subdag import SubDagOperator
from airflow.providers.amazon.auth_manager.avp.entities import AvpEntities
from airflow.providers.amazon.aws.datasets import s3
@ -85,7 +94,7 @@ from airflow.utils.dates import (
scale_time_units,
)
from airflow.utils.decorators import apply_defaults
from airflow.utils.file import TemporaryDirectory, mkdirs
from airflow.utils.file import mkdirs
from airflow.utils.helpers import chain, cross_downstream
from airflow.utils.state import SHUTDOWN, terminating_states
from airflow.utils.trigger_rule import TriggerRule
@ -94,61 +103,93 @@ from airflow.www.utils import get_sensitive_variables_fields, should_hide_value_
# airflow root
PY36, PY37, PY38, PY39, PY310, PY311, PY312
DatasetFromRoot
DatasetFromRoot()
dataset_from_root = DatasetFromRoot()
dataset_from_root.iter_datasets()
dataset_from_root.iter_dataset_aliases()
# airflow.api_connexion.security
requires_access, requires_access_dataset
# airflow.auth.managers
is_authorized_dataset
DatasetDetails
DatasetDetails()
# airflow.configuration
get, getboolean, getfloat, getint, has_option, remove_option, as_dict, set
# airflow.contrib.*
AWSAthenaHook
AWSAthenaHook()
# airflow.datasets
Dataset
DatasetAlias
DatasetAliasEvent
DatasetAll
DatasetAny
Dataset()
DatasetAlias()
DatasetAliasEvent()
DatasetAll()
DatasetAny()
expand_alias_to_datasets
Metadata
Metadata()
dataset_to_test_method_call = Dataset()
dataset_to_test_method_call.iter_datasets()
dataset_to_test_method_call.iter_dataset_aliases()
alias_to_test_method_call = DatasetAlias()
alias_to_test_method_call.iter_datasets()
alias_to_test_method_call.iter_dataset_aliases()
any_to_test_method_call = DatasetAny()
any_to_test_method_call.iter_datasets()
any_to_test_method_call.iter_dataset_aliases()
# airflow.datasets.manager
DatasetManager, dataset_manager, resolve_dataset_manager
DatasetManager(), dataset_manager, resolve_dataset_manager
# airflow.hooks
BaseHook()
# airflow.lineage.hook
DatasetLineageInfo
DatasetLineageInfo()
# airflow.listeners.spec.dataset
on_dataset_changed, on_dataset_created
# airflow.metrics.validators
AllowListValidator, BlockListValidator
AllowListValidator(), BlockListValidator()
# airflow.operators.dummy_operator
dummy_operator.EmptyOperator
dummy_operator.DummyOperator
dummy_operator.EmptyOperator()
dummy_operator.DummyOperator()
# airflow.operators.bash_operator
BashOperator
BashOperator()
# airflow.operators.branch_operator
BaseBranchOperator
BaseBranchOperator()
# airflow.operators.dagrun_operator
TriggerDagRunLink()
TriggerDagRunOperator()
# airflow.operators.dummy
EmptyOperator, DummyOperator
EmptyOperator(), DummyOperator()
# airflow.operators.email_operator
EmailOperator
EmailOperator()
# airflow.operators.latest_only_operator
LatestOnlyOperator()
# airflow.operators.python_operator
BranchPythonOperator()
PythonOperator()
PythonVirtualenvOperator()
ShortCircuitOperator()
# airflow.operators.subdag.*
SubDagOperator
SubDagOperator()
# airflow.providers.amazon
AvpEntities.DATASET
@ -175,7 +216,7 @@ gcs.convert_dataset_to_openlineage
mysql.sanitize_uri
# airflow.providers.openlineage
DatasetInfo, translate_airflow_dataset
DatasetInfo(), translate_airflow_dataset
# airflow.providers.postgres
postgres.sanitize_uri
@ -190,28 +231,28 @@ get_connection, load_connections
RESOURCE_DATASET
# airflow.sensors.base_sensor_operator
BaseSensorOperator
BaseSensorOperator()
# airflow.sensors.date_time_sensor
DateTimeSensor
DateTimeSensor()
# airflow.sensors.external_task
ExternalTaskSensorLinkFromExternalTask
ExternalTaskSensorLinkFromExternalTask()
# airflow.sensors.external_task_sensor
ExternalTaskMarker
ExternalTaskSensor
ExternalTaskSensorLinkFromExternalTaskSensor
ExternalTaskMarker()
ExternalTaskSensor()
ExternalTaskSensorLinkFromExternalTaskSensor()
# airflow.sensors.time_delta_sensor
TimeDeltaSensor
TimeDeltaSensor()
# airflow.timetables
DatasetOrTimeSchedule
DatasetTriggeredTimetable
DatasetOrTimeSchedule()
DatasetTriggeredTimetable()
# airflow.triggers.external_task
TaskStateTrigger
TaskStateTrigger()
# airflow.utils.date
dates.date_range
@ -235,7 +276,7 @@ test_cycle
apply_defaults
# airflow.utils.file
TemporaryDirectory, mkdirs
TemporaryDirector(), mkdirs
# airflow.utils.helpers
chain, cross_downstream
@ -253,34 +294,3 @@ has_access_dataset
# airflow.www.utils
get_sensitive_variables_fields, should_hide_value_for_key
from airflow.datasets.manager import DatasetManager
dm = DatasetManager()
dm.register_dataset_change()
dm.create_datasets()
dm.notify_dataset_created()
dm.notify_dataset_changed()
dm.notify_dataset_alias_created()
from airflow.lineage.hook import HookLineageCollector
hlc = HookLineageCollector()
hlc.create_dataset()
hlc.add_input_dataset()
hlc.add_output_dataset()
hlc.collected_datasets()
from airflow.providers.amazon.auth_manager.aws_auth_manager import AwsAuthManager
aam = AwsAuthManager()
aam.is_authorized_dataset()
from airflow.providers_manager import ProvidersManager
pm = ProvidersManager()
pm.initialize_providers_asset_uri_resources()
pm.dataset_factories