Source code for tests.system.apache.hive.example_twitter_dag
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This is an example dag for managing twitter data."""from__future__importannotationsimportosfromdatetimeimportdate,datetime,timedeltafromairflowimportDAGfromairflow.decoratorsimporttaskfromairflow.providers.apache.hive.operators.hiveimportHiveOperatorfromairflow.providers.standard.operators.bashimportBashOperator# --------------------------------------------------------------------------------# Caveat: This Dag will not run because of missing scripts.# The purpose of this is to give you a sample of a real world example DAG!# --------------------------------------------------------------------------------# --------------------------------------------------------------------------------# Load The Dependencies# --------------------------------------------------------------------------------
[docs]deffetch_tweets():""" This task should call Twitter API and retrieve tweets from yesterday from and to for the four twitter users (Twitter_A,..,Twitter_D) There should be eight csv output files generated by this task and naming convention is direction(from or to)_twitterHandle_date.csv """
@task
[docs]defclean_tweets():""" This is a placeholder to clean the eight files. In this step you can get rid of or cherry pick columns and different parts of the text. """
@task
[docs]defanalyze_tweets():""" This is a placeholder to analyze the twitter data. Could simply be a sentiment analysis through algorithms like bag of words or something more complicated. You can also take a look at Web Services to do such tasks. """
@task
[docs]deftransfer_to_db():""" This is a placeholder to extract summary from Hive data and store it to MySQL. """
clean=clean_tweets()analyze=analyze_tweets()hive_to_mysql=transfer_to_db()fetch>>clean>>analyze# --------------------------------------------------------------------------------# The following tasks are generated using for loop. The first task puts the eight# csv files to HDFS. The second task loads these files from HDFS to respected Hive# tables. These two for loops could be combined into one loop. However, in most cases,# you will be running different analysis on your incoming and outgoing tweets,# and hence they are kept separated in this example.# --------------------------------------------------------------------------------from_channels=["fromTwitter_A","fromTwitter_B","fromTwitter_C","fromTwitter_D"]to_channels=["toTwitter_A","toTwitter_B","toTwitter_C","toTwitter_D"]yesterday=date.today()-timedelta(days=1)dt=yesterday.strftime("%Y-%m-%d")# define where you want to store the tweets csv file in your local directorylocal_dir="/tmp/"# define the location where you want to store in HDFShdfs_dir=" /tmp/"forchannelinto_channels:file_name=f"to_{channel}_{dt}.csv"load_to_hdfs=BashOperator(task_id=f"put_{channel}_to_hdfs",bash_command=(f"HADOOP_USER_NAME=hdfs hadoop fs -put -f {local_dir}{file_name}{hdfs_dir}{channel}/"),)# [START create_hive]load_to_hive=HiveOperator(task_id=f"load_{channel}_to_hive",hql=(f"LOAD DATA INPATH '{hdfs_dir}{channel}/{file_name}'"f"INTO TABLE {channel}"f"PARTITION(dt='{dt}')"),)# [END create_hive]analyze>>load_to_hdfs>>load_to_hive>>hive_to_mysqlforchannelinfrom_channels:file_name=f"from_{channel}_{dt}.csv"load_to_hdfs=BashOperator(task_id=f"put_{channel}_to_hdfs",bash_command=(f"HADOOP_USER_NAME=hdfs hadoop fs -put -f {local_dir}{file_name}{hdfs_dir}{channel}/"),)load_to_hive=HiveOperator(task_id=f"load_{channel}_to_hive",hql=(f"LOAD DATA INPATH '{hdfs_dir}{channel}/{file_name}' "f"INTO TABLE {channel} "f"PARTITION(dt='{dt}')"),)analyze>>load_to_hdfs>>load_to_hive>>hive_to_mysqlfromtests_common.test_utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests_common.test_utils.system_testsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)