Kiat Produksi Aliran Udara — Kegagalan dan percobaan ulang yang dikelompokkan – Menuju AI

Kiat Produksi Aliran Udara — Kegagalan dan percobaan ulang yang dikelompokkan – Menuju AI

Pengarang: Guilherme Banhudo

Awalnya diterbitkan di Towards AI the World’s Leading AI and Technology News and Media Company. Jika Anda sedang membangun produk atau layanan terkait AI, kami mengundang Anda untuk mempertimbangkan untuk menjadi sponsor AI. Di Towards AI, kami membantu menskalakan AI dan startup teknologi. Biarkan kami membantu Anda melepaskan teknologi Anda kepada massa.

Kiat Produksi Aliran Udara — Kegagalan dan Percobaan Ulang yang Dikelompokkan

Foto oleh Jackson Simmer di Unsplash

Apache Airflow telah menjadi standar de facto untuk Orkestrasi Data. Namun, selama bertahun-tahun dan versi, itu mengumpulkan serangkaian nuansa dan bug yang dapat menghambat penggunaan produksinya.

Rangkaian artikel ini bertujuan untuk memandu pengguna Apache Airflow melalui proses mengatasi masalah ini, masalah yang sama yang saya hadapi.

Catatan: Seperti biasa, semua kode tersedia di repositori GitHub saya, di sini.

Kegagalan dan percobaan ulang yang dikelompokkan

TLDR: Dalam Airflow adalah umum untuk banyak tugas dianggap satu, kasus penggunaan yang umum adalah penggunaan Operator Jalankan bersama dengan Operator Sensornya. Jika tugas Sensor gagal, maka secara teknis, seluruh proses gagal dan harus dimulai ulang. Namun, Airflow secara default hanya akan mencoba kembali tugas Sensor tanpa mencoba lagi menjalankan tugas Jalankan. Dalam posting ini, Anda akan mengetahui cara memperbaikinya.

TLDR #2: Anda dapat langsung ke solusi, di sini

Pernyataan masalah

Lebih sering daripada tidak, kita semua dihadapkan dengan kebutuhan untuk membuat kumpulan Tugas yang ingin kita Gagal (dan coba lagi) atau Sukses secara keseluruhan.

Pertimbangkan kasus menjalankan AzureDataFactoryRunPipelineOperator, bernama TaskRun dalam kombinasi dengan Sensor yang sesuai untuk mengambil statusnya, AzureDataFactoryPipelineRunStatusSensor bernama TaskSensor.

Sementara TaskRun dapat berhasil dalam memicu pekerjaan, selalu ada kemungkinan untuk contoh yang dijalankan Pabrik Data Azure gagal, dalam hal ini, TaskSensor akan ditandai sebagai gagal dan siap untuk dicoba lagi. Namun, TaskRun akan tetap berhasil karenanya, tidak dicoba lagi.

Dengan kata lain, kami ingin tugas terkait gagal atau berhasil secara keseluruhan.

Mengapa ini terjadi?

Di Apache Airflow, meskipun tugas ditautkan melalui pembentukan ketergantungan (baik melalui <<, >> operator ketergantungan atau fungsi set_downstream dan set_upstream) namun, ini hanya membantu menentukan aliran — atau Grafik Acyclic Langsung — dari Tugas Anda, dan sementara itu mendefinisikan dependensi satu arah (Tugas hilir), itu tidak menyampaikan informasi mengenai dependensi hulu. Dengan kata lain, jika kita menggabungkan TaskRun dengan TaskSensor menggunakan operator ketergantungan yang sesuai:

Operator Tugas diikuti oleh operator Sensor terkait TaskRun = AzureDataFactoryRunPipelineOperator(…)
TaskSensor = AzureDataFactoryPipelineRunStatusSensor(…)
TaskRun >> TaskSensor

Apache Airflow mengetahui bahwa jika TaskRun gagal, TaskSensor akan dilewati saat memeriksa tugas hilir TaskRun (TaskSensor) dan melewatinya. Namun, hal yang sama tidak dapat disimpulkan di sisi yang berlawanan, karena Tugas hulu mungkin memang berhasil, dan kami cenderung tidak ingin mengulanginya. Misalnya, jika alih-alih TaskRun dan TaskSensor, kami akan memiliki TaskExtract dan TaskTransform dalam urutan tersebut, dan TaskExtract berhasil, tetapi TaskTransform gagal, kami tidak ingin TaskExtract dicoba ulang, dan ini adalah perilaku default Airflow, yaitu, saja, tepat.

Solusinya

Tidak seperti di artikel kami sebelumnya, di mana kami menjelajahi penggunaan ORM untuk mengekstrak informasi DAGRun, solusi untuk masalah kami berkisar pada Tugas secara langsung karena mengandung pengetahuan tentang Tugas hulu mereka sehingga solusi untuk masalah kami jauh lebih sederhana.

Solusi untuk masalah kita kemudian dibagi menjadi dua langkah sederhana:

Ekstrak daftar semua TaskInstances upstream dari TaskInstance tertentu Tandai TaskInstances upstream untuk dicoba lagi atau gagal, bergantung pada perilaku yang diperlukan Tambahkan fungsi callback yang sesuai ke definisi Tugas

Langkah 1: Ekstrak daftar semua TaskInstances upstream dari TaskInstance tertentu

Langkah pertama mungkin yang paling rumit karena ada dua cara berbeda untuk mengatasi masalah:

Kami dapat menandai tugas dalam kedalaman hulu yang ditentukan Atau kami dapat menandai tugas berdasarkan ID Tugas mereka

Kami akan mengeksplorasi dan mempersiapkan setiap metode dan akhirnya menggabungkan keduanya untuk fungsionalitas yang diperluas.

Dimulai dengan metode pertama dan paling sederhana terdiri dari empat langkah:

Dapatkan daftar Tugas yang akan ditandai berdasarkan namanya Ambil contoh TaskInstance yang namanya ada dalam nama Tugas yang disediakan

https://medium.com/media/b556d25d4cbf7b386de110b3047feecf/href

Metode kedua lebih kompleks dan rapuh dan hanya bekerja di bawah asumsi tertentu: semua Tugas hulu yang akan ditandai hanya dependensi linier yang berarti tidak ada percabangan yang terlibat dalam kedalaman pemindaian yang ditentukan.

Lakukan iterasi untuk setiap tingkat kedalaman pemindaian yang ditentukan dari tingkat yang lebih rendah ke tingkat yang lebih tinggi Mengambil dan memvalidasi jumlah ID Tugas upstream untuk Tugas pada tingkat kedalaman pemindaian yang ditentukan Tambahkan ID Tugas upstream yang diambil Lanjutkan ke langkah loop berikutnya untuk Tugas di tingkat kedalaman pemindaian berikutnya Setelah loop selesai, ambil Instance Tugas yang terkait dengan ID Tugas yang diambil.

https://medium.com/media/e4a32c658f3a983c8fdc62878a5e294f/href

Menggabungkan dua metode dan membungkusnya dalam fungsi yang dapat digunakan kembali menghasilkan langkah pertama dan utama kami:

https://medium.com/media/4454f11b785847c2a930471f49a9479c/href

Langkah 2: Tandai TaskInstances upstream untuk dicoba lagi atau sebagai gagal, bergantung pada perilaku yang diperlukan.

Langkah kedua adalah implementasi langsung yang memanfaatkan _get_upstream_task_instance_list yang ditentukan sebelumnya dan hanya menandai TaskInstances terkait dengan status yang diinginkan.

https://medium.com/media/88992506e096582559b1aa62f4c2ffe9/href

Langkah 3: Tambahkan fungsi panggilan balik yang sesuai ke definisi Tugas

Sekarang kami telah menambahkan kemungkinan untuk mengekstrak semua TaskInstances upstream yang terkait dengan TaskInstance tertentu dan menandainya dengan status yang diinginkan, kami harus memberi tahu Airflow cara memanfaatkannya.

Langkah terakhir adalah mengasosiasikan fungsi kita dengan on_failure_callback dan on_retry_callback Definisi tugas callback:

https://medium.com/media/904f3593535934c4989ac2cbe4b79157/href

Beri tahu saya di komentar jika Anda menemukan sumber daya ini berguna dan seperti biasa, Anda dapat menemukan kode ini di repositori GitHub saya!

Kiat Produksi Aliran Udara — Kegagalan dan percobaan ulang yang dikelompokkan awalnya diterbitkan di Towards AI on Medium, di mana orang-orang melanjutkan percakapan dengan menyoroti dan menanggapi cerita ini.

Diterbitkan melalui Menuju AI

Author: Jeffrey Hayes