Click here to Skip to main content
15,442,578 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
Hello I am new using apache airflow, so I to need process some Data with apache airflow, this is my aproach to call the method class inside the DAG, but I still have an error, I post my code here for a little help, I have been searching in the documentacion, but nothing helps, is the method properly called?



import pandas as pd
from io import StringIO
import re 
from sqlalchemy import create_engine
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey, Float
from twilio.rest import Client
from twilio.base.exceptions import TwilioRestException
from sqlalchemy.exc import OperationalError
from airflow.providers.http.sensors.http import HttpSensor
from datetime import datetime
from datetime import timedelta
from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
from datetime import timedelta


class IngestCloudOperator(PythonOperator):
    #inicializo variables
    def __init__(self, df,df2)->pd.DataFrame():
        self.df = df
        self.df2 = df2

    def process_data(self):
        #descarga de csv
        try:
            url='https://drive.google.com/file/d/14JcOSJAWqKOUNyadVZDPm7FplA7XYhrU/view'
            file_id=url.split('/')[-2]
            download_data='https://drive.google.com/uc?id=' + file_id
            df = pd.read_csv(download_data)
            self.df = df
            return df
        except (NameError,TypeError,UnboundLocalError,ValueError) as error:
            print('Declare la variable correcta:',error) 
            raise


    def transform_data(self):
        #transformacion del df
        try:
            df = self.df
            col = ['origin_coord','destination_coord']
            keywords = ["POINT","(",")"," "]
            float_point = ["."]
            for i in range(len(keywords)):
                df = df[['origin_coord','destination_coord']].apply(lambda x: x.str.replace(keywords[i],'',regex=True))
            for i in range(len(float_point)):
                df = df[['origin_coord','destination_coord']].apply(lambda x: x.str.replace(float_point[i],',',regex=True))
            self.df2 = df
            self.df2[['origin_coord','destination_coord']].astype(str)

            aux_indexes=[]
            aux_indexes_=[]
            for item, item_ in zip(self.df2.origin_coord,self.df2.destination_coord):
                index = item.find(",")
                index_ = item_.find(",")
                aux_indexes.append(index)
                aux_indexes_.append(index_)

            self.df2 = self.df2[['origin_coord','destination_coord']].apply(lambda x: x.str.replace(',','',regex=True))


            new_index_=[]
            new_index__=[]


            for index, item in zip(aux_indexes,self.df2.origin_coord):
                item = item[:index] + ',' + item[index:]
                new_index_.append(item)
                new_index_=list(dict.fromkeys(new_index_))


            for index, item in zip(aux_indexes_,self.df2.destination_coord):
                item = item[:index] + ',' + item[index:]
                new_index__.append(item)
                new_index__= list(dict.fromkeys(new_index__))

            df4 = pd.DataFrame(new_index_).rename(columns={0:'origin_coord'})
            df5 = pd.DataFrame(new_index__).rename(columns={0:'destination_coord'})

            df['origin_coord'] = df4[['origin_coord']]
            df['destination_coord'] = df5[['destination_coord']]
            df = df[['origin_coord','destination_coord']].apply(lambda x: x.str.replace(",",".",regex=True)).astype(float)
            df = df[['origin_coord','destination_coord']].apply(lambda x: x.round(1))
            self.df2 = df
            return self.df2
        except (NameError,TypeError,UnboundLocalError,ValueError) as error:
            print('Declare la variable correcta:',error) 
            raise
       
        
    def update_data(self):
        try:
            
            df2 = self.df2
            df = self.df

            df['origin_coord']= df2[['origin_coord']]
            df['destination_coord']= df2[['destination_coord']]
            df['date'] = pd.to_datetime(df['datetime']).dt.date

            df['day'] = df['date'].apply(lambda x : x.day)
            df['Hour'] = pd.to_datetime(df['datetime']).dt.hour
            df['week'] = df['day'].apply(lambda x: (x-1)//7+1) 
            df2 = df.groupby(['region','origin_coord','destination_coord','datetime'])['region'].agg(['count'], ascending=True).\
                rename(columns={'count':'Total'}).reset_index()
            df2 = df.groupby(['region'])['week'].agg(['mean'], ascending=True).\
                rename(columns={'mean':'weekly_avg'}).reset_index()
            self.df2 = df2

            df3 = pd.merge(left=df,right=df2,how="inner", on=["region"])

            df4 = df3.groupby(['region','origin_coord','destination_coord','Hour','weekly_avg'])['weekly_avg'].\
            agg(['count'],ascending=True).rename(columns={'count':'Total'}).reset_index()
            df4 = df4.drop(['Total'], axis=1)
            self.df = df4
            return df4
        except (NameError,TypeError,UnboundLocalError,ValueError) as error:
            print('Declare la variable correcta:',error) 
            raise
    
    
    def import_data(self):
        try:
            
            #se carga el df final y la conexion a la bd configurada en gcloud
            df = self.df
            conn_string = 'postgresql://test:test@34.72.31.189:5432/test'
            engine = create_engine(conn_string, echo=True)
            engine.connect()

            #se crea la tabla
            meta = MetaData()
            trips_data = Table(
             'trips_data', meta, 
               Column('region', String(60)), 
               Column('origin_coord', Float), 
               Column('destination_coord', Float),
               Column('Hour', Integer), 
               Column('weekly_avg', Float),
              
            )
            meta.create_all(engine)

            #se insertan los datos en la tabla creada
            df.to_sql(name ='trips_data', con=engine, index=False, chunksize=500,if_exists='replace')
           
            account_sid = "ACa045c88bd06ec82f87f4772f1d864043"
            auth_token  = "7c82696275a674d4669865a13bb4157a"
            client = Client(account_sid,auth_token)
            from_whatsapp_number='whatsapp:+14155238886'
            to_whatsapp_number='whatsapp:+573013919941'
            client.messages.create(body='Records Inserted in Database',from_=from_whatsapp_number,to=to_whatsapp_number)
        except (NameError,TypeError,UnboundLocalError,ValueError,TwilioRestException,AttributeError,OperationalError) as error:
            account_sid = "ACa045c88bd06ec82f87f4772f1d864043"
            auth_token  = "7c82696275a674d4669865a13bb4157a"
            client = Client(account_sid,auth_token)
            from_whatsapp_number='whatsapp:+14155238886'
            to_whatsapp_number='whatsapp:+573013919941'
            client.messages.create(body=f'{error}''records can not be inserted error',from_=from_whatsapp_number,to=to_whatsapp_number)
            raise
        
    def run_dag(self):

        default_args = {
        'start_date': datetime.today().strftime('%Y-%m-%d')
        }

        with DAG ('cloud_ingesting', schedule_interval='@daily',dagrun_timeout=timedelta(minutes=1),
                default_args = default_args , catchup=False) as dag:

            is_api_available = HttpSensor(
            task_id='is_api_available',
            http_conn_id='user_api',
            endpoint='api/'
        )


            processing_data = IngestCloudOperator(
            task_id='processing_data',
            python_callable=process_data
        )


            transforming_data = IngestCloudOperator(
            task_id='transform_data',
            python_callable=transform_data
        )

            updating_data = IngestCloudOperator(
            task_id='update_data',
            python_callable=update_data
        )

            importing_data = IngestCloudOperator(
            task_id='import_data',
            python_callable=import_data
        )
        is_api_available >> processing_data >> transforming_data >> update_data >> importing_data


        return dag



import_cloud = IngestCloudOperator()
import_cloud.process_data()
import_cloud.transform_data()
import_cloud.update_data()
import_cloud.import_data()

dag = import_cloud.run_dag()

globals()['cloud_ingesting'] = dag




I am getting this error:
Broken DAG: [/home/airflow/airflow/dags/jobsity_test.py] Traceback (most recent call last):
  File "/home/airflow/airflow/dags/jobsity_test.py", line 209, in <module>
    import_cloud = IngestCloudOperator()
  File "/home/airflow/sandbox/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 174, in apply_defaults
    raise AirflowException(msg)
airflow.exceptions.AirflowException: Argument ['df', 'df2'] is required


What I have tried:

I am trying to run this dag, but i am getting error that i described, please a litte help here.
Posted
Updated 29-Jan-22 7:27am

1 solution

Getting your code to run does not mean your code is right! :laugh:
Think of the development process as writing an email: compiling successfully means that you wrote the email in the right language - English, rather than German for example - not that the email contained the message you wanted to send.

So now you enter the second stage of development (in reality it's the fourth or fifth, but you'll come to the earlier stages later): Testing and Debugging.

Start by looking at what it does do, and how that differs from what you wanted. This is important, because it give you information as to why it's doing it. For example, if a program is intended to let the user enter a number and it doubles it and prints the answer, then if the input / output was like this:
Input   Expected output    Actual output
  1            2                 1
  2            4                 4
  3            6                 9
  4            8                16
Then it's fairly obvious that the problem is with the bit which doubles it - it's not adding itself to itself, or multiplying it by 2, it's multiplying it by itself and returning the square of the input.
So with that, you can look at the code and it's obvious that it's somewhere here:
C#
int Double(int value)
   {
   return value * value;
   }

Once you have an idea what might be going wrong, start using the debugger to find out why. Put a breakpoint on the first line of the method, and run your app. When it reaches the breakpoint, the debugger will stop, and hand control over to you. You can now run your code line-by-line (called "single stepping") and look at (or even change) variable contents as necessary (heck, you can even change the code and try again if you need to).
Think about what each line in the code should do before you execute it, and compare that to what it actually did when you use the "Step over" button to execute each line in turn. Did it do what you expect? If so, move on to the next line.
If not, why not? How does it differ?
Hopefully, that should help you locate which part of that code has a problem, and what the problem is.
This is a skill, and it's one which is well worth developing as it helps you in the real world as well as in development. And like all skills, it only improves by use!
 
Share this answer
 

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900