Click here to Skip to main content
15,887,746 members
Please Sign up or sign in to vote.
1.00/5 (2 votes)
Hi Everyone,

please help me to write a code that can copy data from cloud storage to bigquery, the ask is that only delta data gets copied to bigquery table and do not write the data which is already existing in bigquery table.


I tried below but it just copies the data in file in BQ, and keep on adding everytime I push file in bq. please help I am on deadline :(

What I have tried:

Python
import os
import sys
import pandas as pd
import logging
from datetime import timedelta
from google.cloud import bigquery
import pandas_gbq
import json
import traceback
from datetime import datetime
from google.api_core import retry
from google.cloud import storage


# Get ENV variables 
BQ_PROJECT_ID = os.getenv('bq_project')
BQ_DATASET_ID = os.getenv('BQ_DATASET_ID')
SOURCE_LANDING_BUCKET = os.getenv('source_bucket')
ML_SCORING_TABLE_NAME = 'country_new'
JOB_LOG_TABLE_NAME = 'updated_country_new'
UPDATE_TABLE_NAME = 'update_table'
DELETE_NEW_TABLE_NAME = 'delete_table'

ML_SCORING_TABLE = "{}.{}.{}".format(BQ_PROJECT_ID, BQ_DATASET_ID, ML_SCORING_TABLE_NAME)
JOB_LOG_TABLE = "{}.{}.{}".format(BQ_PROJECT_ID, BQ_DATASET_ID, JOB_LOG_TABLE_NAME)
DELETE_NEW_TABLE = "{}.{}".format(BQ_DATASET_ID, DELETE_NEW_TABLE_NAME)
UPDATE_TABLE = "{}.{}".format(BQ_DATASET_ID, UPDATE_TABLE_NAME)
DEL_NEW_FILE_NAME = "del_new"
UPDATE_FILE_NAME = "update"
FORMAT = '%(levelname)s [%(asctime)s]: %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT,
                    datefmt='%Y/%m/%d %H:%M:%S')
					
					
# Cloud storage client 
CS = storage.Client()
# BigQuery Client
BQ = bigquery.Client()


def bq_load(data, context):
    '''This function is executed whenever a file is added to Cloud Storage Landing bucket'''
    
    file_name = data['name']
    table_name = file_name.split(".")[0] 
    file_extension = str(file_name.split(".")[1])
	
	    # Check for file extension 
    if(file_extension.lower() == "avro"):
        message = 'Perform bq load with file movement, file : \'%s\'' % (file_name)
        logging.info(message)
        _load_table_from_avro(table_name,file_name)
    elif(file_extension.lower() == "png"):
        message = 'Perform file movemnt only , file  : \'%s\'' % (file_name)
        logging.info(message)
        source_bucket_name = SOURCE_LANDING_BUCKET
        destination_bucket_name = DESTINATION_BUCKET
        _move_file(file_name,source_bucket_name,destination_bucket_name)
    elif(file_extension.lower() == "parquet"):
        message = 'Perform file movemnt only , file  : \'%s\'' % (file_name)
        logging.info(message)
		source_bucket_name = SOURCE_LANDING_BUCKET
        _load_table_from_parquet(table_name,file_name)
    elif(file_extension.lower() == "csv"):
        message = 'Perform file movemnt only , file  : \'%s\'' % (file_name)
        logging.info(message)
        _load_table_from_csv(table_name,file_name)	
    elif(file_extension.lower() == "json"):
        message = 'Perform file movemnt only , file  : \'%s\'' % (file_name)
        logging.info(message)
        source_bucket_name = SOURCE_LANDING_BUCKET
        _load_table_from_json(table_name,file_name,source_bucket_name)	
        
    else: 
        message = 'Not supported file format, file : \'%s\'' % (file_name)
        logging.info(message)
		

def _move_file(file_name,source_bucket_name,destination_bucket_name):
    '''This function perform file movement '''
    
    source_bucket = CS.get_bucket(source_bucket_name)
    source_blob = source_bucket.blob(file_name)
    
    destination_bucket = CS.get_bucket(destination_bucket_name)

    source_bucket.copy_blob(source_blob, destination_bucket, file_name)
    source_blob.delete()

    logging.info('File \'%s\' moved from \'%s\' to \'%s\'',
                 file_name,
                 source_bucket_name,
                 destination_bucket_name
                 )
def _if_tbl_exists(table_ref):
    ''' This function check if bigquery table is present or not '''
    from google.cloud.exceptions import NotFound
    try:
        BQ.get_table(table_ref)
        return True
    except NotFound:
        return False
		
		
def _load_table_from_json(table_name,file_name,source_bucket_name):
    table_id = "%s.%s.%s" % (BQ_PROJECT_ID,BQ_DATASET_ID,table_name)
    message = 'Table_id  : \'%s\'' % (table_id)
    logging.info(message)

    if(_if_tbl_exists(table_id)):
        destination_table = BQ.get_table(table_id)
        num_rows_added = destination_table.num_rows
    else:
        num_rows_added = 0

    job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND, 
    source_format=bigquery.SourceFormat.PARQUET,
    )
    uri = 'gs://%s/%s' % (SOURCE_LANDING_BUCKET,file_name)
    try:
             
        load_job = BQ.load_table_from_uri(
        uri, table_id, job_config=job_config
        )  # Make an API request.	
    
        load_job.result()  # Waits for the job to complete.
        destination_table = BQ.get_table(table_id)
        logging.info('Table \'%s\' loaded with \'%s\' rows ',
                    table_id,
                    destination_table.num_rows-num_rows_added
                    )
        source_bucket_name = SOURCE_LANDING_BUCKET
        destination_bucket_name = DESTINATION_BUCKET
        start_delta_calculator()         
    except Exception:
        error_message = 'Invalid file format for file name : \'%s\'' % (file_name)
        logging.error(error_message)

    print("Job finished.")
		
		

def _handle_error():
    message = 'Error l file. Cause: %s' % (traceback.format_exc())
    print(message)


class BigQueryError(Exception):
    '''Exception raised whenever a BigQuery error happened''' 

    def __init__(self, errors):
        super().__init__(self._format(errors))
        self.errors = errors

    def _format(self, errors):
        err = []
        for error in errors:
            err.extend(error['errors'])
        return json.dumps(err)
		
		
def _load_table_from_json(table_name,file_name,source_bucket_name):
    table_id = "%s.%s.%s" % (BQ_PROJECT_ID,BQ_DATASET_ID,table_name)
    message = 'Table_id  : \'%s\'' % (table_id)
    logging.info(message)

    if(_if_tbl_exists(table_id)):
        destination_table = BQ.get_table(table_id)
        num_rows_added = destination_table.num_rows
    else:
        num_rows_added = 0

    job_config = bigquery.LoadJobConfig(
	schema=[
        bigquery.SchemaField("id", "INTEGER"),
		bigquery.SchemaField("name", "STRING"),
		bigquery.SchemaField("post_abbr", "STRING"),
	],
	source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
	)
    uri = 'gs://%s/%s' % (SOURCE_LANDING_BUCKET,file_name)
    try:
             
        load_job = BQ.load_table_from_uri(
        uri, table_id, job_config=job_config
        )  # Make an API request.	
    
        load_job.result()  # Waits for the job to complete.
        destination_table = BQ.get_table(table_id)
        logging.info('Table \'%s\' loaded with \'%s\' rows ',
                    table_id,
                    destination_table.num_rows-num_rows_added
                    )
        source_bucket_name = SOURCE_LANDING_BUCKET
        destination_bucket_name = DESTINATION_BUCKET
        start_delta_calculator()         
    except Exception:
        error_message = 'Invalid file format for file name : \'%s\'' % (file_name)
        logging.error(error_message)

    print("Job finished.")
		
		

def _handle_error():
    message = 'Error l file. Cause: %s' % (traceback.format_exc())
    print(message)


class BigQueryError(Exception):
    '''Exception raised whenever a BigQuery error happened''' 

    def __init__(self, errors):
        super().__init__(self._format(errors))
        self.errors = errors

    def _format(self, errors):
        err = []
        for error in errors:
            err.extend(error['errors'])
        return json.dumps(err)


def start_delta_calculator():
    logger.info("date_string {}".format(date_string))
    logger.info("load tables")
    ml_scoring = load_dataframes(ML_SCORING_TABLE)
    logger.info("ml_scoring")
    print(ml_scoring)
    ml_scoring_last_load = load_hist_table()
    logger.info("ml_scoring_last_load")
    print(ml_scoring_last_load)

    logger.info("drop Nas")
    ml_scoring_last_load_cleaned = drop_na_rows(ml_scoring_last_load)
    logger.info("ml_scoring_last_load_cleaned")
    print(ml_scoring_last_load_cleaned)
    ml_scoring_cleaned = drop_na_rows(ml_scoring)
    logger.info("ml_scoring_cleaned")
    print(ml_scoring_cleaned)

    logger.info("joinDataframe")
    joined_df = join_dataframe(ml_scoring_cleaned, ml_scoring_last_load_cleaned)
    logger.info("joined_df")
    print(joined_df)

    logger.info("get newCustomers")
    new_customers = get_new_customers(joined_df, ml_scoring_cleaned.shape[1])
    logger.info("new_customers")
    print(new_customers)

    logger.info("get delCustomers")
    del_customers = get_del_customers(joined_df, ml_scoring_cleaned.shape[1])
    logger.info("del_customers")
    print(del_customers)

    logger.info("union delete and new customers")
    del_new_customers = pd.concat([new_customers, del_customers])
    logger.info("del_new_customers")
    print(del_new_customers)

    if not del_new_customers.empty:
        write_to_bq(del_new_customers, DELETE_NEW_TABLE, DELETE_NEW_TABLE_NAME, DEL_NEW_FILE_NAME)

    logger.info("get overlap _customers")
    overlap_customers = get_overlap_customers(joined_df)
    logger.info("overlap_customers")
    print(overlap_customers)

    logger.info("get updateCustomers")
    update_customers = generate_update_customers_column_delta(overlap_customers, ml_scoring)
    logger.info("update_customers")
    print(update_customers)

    if not update_customers.empty:
        logger.info("write updateCustomers to gcp")
        write_to_bq(update_customers, UPDATE_TABLE, UPDATE_TABLE_NAME, UPDATE_FILE_NAME)

def load_hist_table():
    client = bigquery.Client()
    query_job = client.query(
        """BEGIN
            DECLARE last_date_ml_scoring Timestamp;
            SET last_date_ml_scoring = (SELECT max(run_date) 
            FROM `{}` where job_name='DELTA' and success=true);
            SELECT * FROM `{}`  
            FOR SYSTEM_TIME AS OF last_date_ml_scoring;
        END;""".format(JOB_LOG_TABLE, ML_SCORING_TABLE)
    )
    return query_job.result().to_dataframe()


def load_dataframes(table_path):
    client = bigquery.Client()
    df = client.list_rows(table_path).to_dataframe()
    return df


def drop_na_rows(data_frame):
    table_columns = data_frame.columns.values.tolist()
    all_columns = table_columns[2:len(table_columns) - 1]
    return data_frame.dropna(how='all', subset=all_columns)


def join_dataframe(updateTable, oldTable):
    logger.info("join update and old table")
    return pd.merge(updateTable, oldTable, on=['name', 'post_abbr'], how='outer', indicator=True,
                    suffixes=('', '_old'))


def get_new_customers(joined_df, size):
    return joined_df.loc[(joined_df['_merge'] == "left_only")].iloc[:, 0:size]


def get_del_customers(joined_df, size):
    return joined_df.loc[(joined_df['_merge'] == "right_only")].iloc[:, 0:size]


def get_overlap_customers(joined_df):
    return joined_df.loc[(joined_df['_merge'] == "both")].iloc[:, 0:(joined_df.shape[1] - 1)]


def generate_update_customers_column_delta(updateCustomers, mlScoringResult):
    column_list = mlScoringResult.columns.values.tolist()[0:1]

    for i in updateCustomers.columns[3:mlScoringResult.shape[1] - 1]:
        changed = (updateCustomers[i].equals(updateCustomers[i + "_old"]))
        if changed == False:
            column_list.append(i)

    column_list.append("last_updt_timestamp")
    print(column_list)
    print(len(column_list))

    if(len(column_list)>4):
        return updateCustomers.filter(items=column_list)
        return updateCustomers.filter(items=column_list)
    else:
        return pd.DataFrame()

def write_to_bq(dataframe, table, table_name, file_type):
    logger.info("write {} to bq".format(file_type))
    pandas_gbq.to_gbq(df, 'df_cdc.updated_country_new', BQ_PROJECT_ID, table ,  if_exists='append'
                      )
					  
					  
    client = bigquery.Client()
    file_name = "country_new.json".format(date_string,file_type)
    destination_uri = "gs://df_cdc_bucket/".format(BUCKET_NAME, JOB_NAME, file_name)
    dataset_ref = bigquery.DatasetReference(BQ_PROJECT_ID, BQ_DATASET_ID)
    table_ref = dataset_ref.table(table_name)

    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        location=LOCATION,
    )
    extract_job.result()

    print(
        "Exported {}:{}.{} to {}".format(BQ_PROJECT_ID, BQ_DATASET_ID, DELETE_NEW_TABLE_NAME, destination_uri)
    )


def update_job_log_table():
    logger.info("update_job_log_table")
    client = bigquery.Client()
    query_string = """INSERT INTO {} VALUES ('DELTA', true, current_timestamp());""".format(JOB_LOG_TABLE)
    print(query_string)
    client.query(query_string)
Posted
Updated 12-Jan-23 4:08am
v2

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900