Occasionally, you would see the full graph during execution, only to have it disappear when the DAG completed. While it generated less noise, it added additional complexity because if there was a failure in one of the upstream tasks, the Airflow variable had to be cleared so it would run again on the next scheduled interval.įrom a debugging standpoint, because an Airflow variable was being read to prevent task execution, the DAG always showed the task that tested for the Airflow variable and rarely showed the fully populated DAG graph, with the dynamically rendered tasks. If it was really executing, only then would the LIST command run on the SFTP site and generate the dynamic tasks from there. Prior to trying XCOM, my solution was to create an Airflow variable that tracked actual execution of the DAG. Additionally, any log entries generated from executing the top level code essentially creates “noise” that is best left unseen. AFAIK, this hasn’t changed for Airflow 1.10.0.īecause the code is retrieving a remote file list every second, there’s an excessive amount of chattiness from Airflow to the SFTP server. The problem came down to one that currently exists in Airflow 1.9.0 where the scheduler refreshes the DAG list every second because it doesn’t maintain DAG state and the scheduler interval isn’t currently configurable. It’s not a complicated DAG, and you can see that the task_id’s are uniquely named based on the files found on the remote SFTP site by appending the filename to each task_id. Task_id='delete_remote_file' + '-' + file,ĭb_load_t_downstream(delete_remote_file_task) Get_remote_file_t_downstream(db_load_task)ĭelete_remote_file_task = SFTPToS3Operator( This is what some of the code looked like: directory_list = sftp_handler('sftp', None, '/Remote/', None, SFTPToS3Operation.LIST)ĭestination_file_path='s3://' + bucket_name + '/SFTP/raw/' + file,ĭb_load_task = DBLoadOperator(task_id='data_load', The SFTP implementation went through a couple of revisions, primarily due to the chattiness of the DAG. writing code in your DAGs can be covered in a separate blog post.įor the SFTP handling, dynamic tasks were written that downloaded the file, decrypted it and then deleted the source file on the SFTP site. Other plugins handle the SFTP of files, GNUPG encryption and decryption tasks, etc. The Problemįirst, a little bit of background: As part of our Airflow implementations, we’ve developed custom plugins that do a great job of encapsulating the need for querying databases, storing the results in a CSV file to an S3 or GCS bucket and then ingesting that data into a Cloud Data Warehouse. So rather than reinvent the wheel, I suggest you read his blog and reference his code where applicable and I will call out the changes I made. I’m going to walk you through my own introduction to XCOM that came from Michal Karzynski’s great blog: This blog is not geared towards introducing you to Airflow and all that it can do, but focused on a couple of XCOM use cases that may be beneficial to your own projects. is a robust workflow pipeline framework that we’ve used at Precocity for with a number of clients with great success. resolve_xcom_backend ( ) ¶Ĭonfirms that custom XCom class extends the BaseXCom.Ĭompares the function signature of the custom XCom serialize_value to the base XCom serialize_value. Unnecessary request or other resource consuming operations whenĬreating XCom orm model. This method should be overridden in custom XCom backends to avoid Serialize XCom value to str or pickled object static deserialize_value ( result ) ¶ĭeserialize XCom value from str or pickle object orm_deserialize_value ( ) ¶ĭeserialize method which is used to reconstruct ORM XCom object. static serialize_value ( value, *, key = None, task_id = None, dag_id = None, run_id = None, map_index = None ) ¶ If not given, a new session will beĬlassmethod delete ( xcoms, session ) ¶ĭelete one or multiple XCom entries. Pass None (default) to remove the filter. Ti_key ( ) – The TaskInstanceKey to look up the XCom for. Match the criteria, an arbitrary one is returned. If there are no results, None is returned. Use get_many() if you want the “shortened” uses deserialize_valueįrom the XCom backend). This method returns “full” XCom values (i.e. Retrieve an XCom value for a task instance. classmethod get_value ( *, ti_key, key = None, session = NEW_SESSION ) ¶ I.e automatically deserialize Xcom value when loading from DB. _tablename_ = xcom ¶ dag_run_id ¶ task_id ¶ map_index ¶ key ¶ dag_id ¶ run_id ¶ value ¶ timestamp ¶ _table_args_ ¶ dag_run ¶ execution_date ¶ init_on_load ( ) ¶Ĭalled by the ORM after the instance has been loaded from the DB or otherwise reconstituted BaseXCom ( context = None ) ¶īases:, _mixin.LoggingMixinīase class for XCom objects.
0 Comments
Leave a Reply. |
Details
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |