To use the TriggerDagRunOperator, we need to define something like this: # Wrapper DAG from corators import task, dagįrom _dagrun import TriggerDagRunOperatorįrom import get_current_contextįrom datetime import 1, 7), catchup=False) def wrapper_dag(): def create_backup_env(): The SolutionįYI - I simplified the solution a lot but always kept the main components untouched. Also, these DAGs cannot be executed manually or with a scheduled interval anymore but the Wrapper DAG instead, the create-backup-env task has to always be run first for the 2 DAGs to always push data to the same env and don't push to old envs that will not be used anymore.įurthermore, the 2 DAGs can receive quite many config parameters to execute or not certain tasks using the Trigger DAG w/config feature that Airflow provides, so these parameters have to be also available in the Wrapper DAG. The proposed solution was to create a new DAG (which I'll call Wrapper from now on) that first runs this create-backup-env task and then triggers the 2 DAGs using the TriggerDagRunOperator. With this, the 2 DAGs cannot run async anymore, they have to sync the data to the same environment. If anything goes wrong, we can just switch the environment and delete the broken one. The sync process between the 2 data sources is not free of failures so, a new need come up, which was to first create a backup of the env and then sync the data to a new env that is a copy of the old one. Until now, both DAGs were run individually, updating the CMS environment async. Each DAG syncs a specific type of data to the same env. I had 2 DAGs that run at the same time (with the same schedule_interval) and synced data from the ERP to the CMS. I had 2 data sources, an ERP and one content environment (from now on I'll call it 'env') from a CMS (if you don't know what a CMS is, I explain a little bit about it in this post). If you want to go straight to the solution you can skip this section. Maybe I was just not experienced enough and I fell into a really easy thing to fix but, today I'll show how to do it, so you don't have to struggle as I did □ let's get into it. Stay tuned for that, and I’ll make sure to publish the article in a couple of days.So I was in this situation, struggling for like 5 hours yesterday (yes, the last 5 Friday work hours, the best ones to get stuck with some code) trying to pass parameters using the TriggerDagRunOperator, and wanting to die but at the end achieving it. In the following article, we’ll take a deep dive into Airflow Xcoms, which is a method of sending data between the tasks. Most of the time you don’t need to run similar tasks one after the other, so running them in parallel is a huge time saver. It’s a huge milestone, especially because you can be more efficient now. Today you’ve successfully written your first Airflow DAG that runs the tasks in parallel. That’s all I wanted to cover today, so let’s wrap things up next. Image 9 - Airflow DAG runtime in the Gantt view (image by author)īars representing the runtimes are placed on top of each other, indicating the tasks have indeed run in parallel. The best indicator is, once again, the Gantt view: Image 8 - Inspecting the running DAG (image by author) Trigger the DAG once again and inspect the Tree view - you’ll see that the tasks have started running at the same time: The start task will now run first, followed by the other four tasks that connect to the APIs and run in parallel. Image 7 - DAG view showing the tasks will run in parallel (image by author) You can see how the Graph view has changed: Let’s write it above the current first task: To start, we’ll need to write another task that basically does nothing, but it’s here only so we can connect the other tasks to something. Let’s go back to the code editor and modify the DAG so the tasks run in parallel. Image 6 - Airflow DAG runtime in the Gantt view (image by author) Image 5 - Airflow DAG running tasks sequentially (image by author)īut probably the best confirmation is the Gantt view that shows the time each task took: Running the DAG confirms the tasks are running sequentially : It’s a huge waste of time since the GET requests aren’t connected in any way. You can see that the tasks are connected in a sequential manner - one after the other. Image 4 - Tasks of the Airflow DAG connected sequentially (image by author) Here’s what it looks like in the Graph view: Open up the Airflow webserver page and open our new DAG. That’s all we need for now, so let’s test the DAG through the Airflow homepage next. Image 3 - Saved users in JSON format (image by author) The task execution succeeded, and here’s what it saved to the data folder: Image 2 - Testing an Airflow task through Terminal (image by author) Airflow tasks test parallel_dag get_users 2022 - 3 - 1
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |