#!/bin/bash ################################################################## # AMPY Incremental Tables New Code Implementation # # # # John Bonifas # # last updated 07-23-2020 # # # # | = pipe command output to another command # # # # ` = execute the command between these backticks # # and capture the results # # > = redirect and empty existing file or create new file # # >> = add stdout to an existing file # # 2>> = add stderr to an existing file # # | = sed regex delimiter character in case regex # # has reserved characters # # echo -e = if you have escaped characters # # echo -n = don't echo newlines # # {} = variable indirection # # \ = escape next character or space plus \ = line continuation # ################################################################## # GLOBALS # ####### # folders TARGETFOLDER=/development/etl/source/ampy; TESTFOLDER=/export/home/jbbonifa/scripts/lab/finaltest; PRODUCTIONTEMPFOLDER=/export/home/jbbonifa/scripts/temp; # input flat files INCOMINGDATASETFILE=thetablesplusfields_singleline.csv; #INCOMINGDATASETFILE=thetablesplusfields.csv; # vars RUNTIME=`date +%Y-%m-%d; date +%r;`; ENVIRONMENT=dev_; LPAREN="("; RPAREN=")"; BACKTICK="\`"; # Log and error files echo '' > ampynewcode_log.txt; echo '' > ampynewcode_errors.txt; # DATA TWISTING ENGINE # ==================== # https://stackoverflow.com/questions/16487258/how-to-declare-2d-array-in-bash/16487733 # Engine to twist multidimensional dataset into a BASH one-dimensional array # based on Rashok's post # map input data to an array variable mapfile -t INCOMINGDATASET < <(cat $INCOMINGDATASETFILE); TWODARRAY=(); ONEDARRAY=(); # twist (merge) the two rightmost columns into the leftmost column # TW0DARRAY => ONEDARRAY for COMMADELIMITEDROW in ${INCOMINGDATASET[@]} do # sed regex fixed. # https://unix.stackexchange.com/questions/447741/how-to-replace-a-plus-sign-with-a-space-in-sed # #:~:text=The%20y%20command%20in%20sed,does%20it%20for%20every%20occurrence. TWODARRAY+=(`echo "$COMMADELIMITEDROW" | sed 's/,/ /g'`); done col_size=3; col_count=0; for ELEMENT in ${TWODARRAY[@]}; do if [ ${col_count} -eq ${col_size} ]; then col_count=0; fi ONEDARRAY+=(`echo -e "$ELEMENT"`); ((col_count++)) done # MAIN PROCESSING LOOP # ==================== rowcount="${#ONEDARRAY[@]}"; loopcount=0; tablecounter=0; while [ $loopcount -lt $rowcount ] do # dynamic variable assignment from the dataset. TABLENAME=${ONEDARRAY[${loopcount}]}; PARTITIONFIELDDT=${ONEDARRAY[$loopcount+1]}; PRIMARYKEY=${ONEDARRAY[$loopcount+2]}; # progress line let "tablecounter++"; echo -e "\n"$tablecounter"." working on table: $TABLENAME"\n"partitionfield: $PARTITIONFIELDDT"\n"primarykey: $PRIMARYKEY | tee -a ampynewcode_log.txt; # target load script to update LOADHQLFILE=$TARGETFOLDER/$TABLENAME/load_table_ampy-${TABLENAME}_impala.hql; # for testing #TESTHQLFILE=$TESTFOLDER/$TABLENAME/load_table_ampy-${TABLENAME}_impala.hql; # https://linuxconfig.org/how-to-create-a-new-subdirectory-with-a-single-command-on-linux #mkdir -p $TESTFOLDER/$TABLENAME; #echo '' > $TESTHQLFILE; # for PRODUCTION PRODUCTIONTEMPHQLFILE=$PRODUCTIONTEMPFOLDER/load_table_ampy-${TABLENAME}_impala.hql; PRODUCTIONHQLFILE=$TARGETFOLDER/$TABLENAME/load_table_ampy-${TABLENAME}_impala.hql; PRODUCTIONHQLFOLDER=$TARGETFOLDER/$TABLENAME; echo '' > $PRODUCTIONTEMPHQLFILE; # Comments # in the final tests, I suspect that the reason why the comments were not # coming through was because of lag in HDFS itself. So if there is a next # time that this script is run, the " &&" will ensure that the script waits # until this operation has completed. # ###################################################################### LOADHQLTEXT=`hdfs dfs -cat $LOADHQLFILE 2>>ampynewcode_errors.txt` && # ###################################################################### # find the first blank line in the file OLDCOMMENTLINES=`echo "$LOADHQLTEXT" | sed -n -e '/./p;/./!q'`; # add my entry MYCOMMENTLINE="-- | 2020-07-16 | JBBONIFA | Logic change to accommodate record edits"; # merge em echo -e "$OLDCOMMENTLINES""\n""$MYCOMMENTLINE""\n" >> $PRODUCTIONTEMPHQLFILE; # Run time echo -e "-- "Created by ampy query updating script $RUNTIME"\n" >> $PRODUCTIONTEMPHQLFILE; # Refresh Lines REFRESHLINES="REFRESH ${ENVIRONMENT}raw.ampy_$TABLENAME;""\n""REFRESH ${ENVIRONMENT}ampy.$TABLENAME;""\n"; echo -e "$REFRESHLINES" >> $PRODUCTIONTEMPHQLFILE; # With Block WITHBLOCK=" with old as\n \ (\n \ select *\n \ from ${ENVIRONMENT}ampy.$TABLENAME\n \ where dt in\n \ (\n \ select distinct to_date($PARTITIONFIELDDT) as dt\n \ from ${ENVIRONMENT}raw.ampy_$TABLENAME\n \ )\n \ )\n\n \ INSERT OVERWRITE TABLE ${ENVIRONMENT}ampy.$TABLENAME PARTITION(dt)\n\n \ -- New records to add and old records to update\n \ SELECT\n" echo -e "$WITHBLOCK" >> $PRODUCTIONTEMPHQLFILE; # get table schema # there is an ampy data type that includes commas, so we use the @ sign as a delimiter instead, just for results assignment # " &&" makes it wait until the command is finished # ######################################################################################################################### RESULTS=`impala-shell -k -i srphdw010 --quiet -q "describe ${ENVIRONMENT}ampy.$TABLENAME;" -B --output_delimiter="@" | \ sed ':a;N;$!ba;s|\n||g' 2>>ampynewcode_errors.txt` && # ######################################################################################################################## IFS=$'@'; FIELDNAMES=($RESULTS); IFS=$'\n'; # top select list UNIONALIAS="new"; for ((fieldlooptop=0;fieldlooptop< ${#FIELDNAMES[@]} ;fieldlooptop+=2)); do if [ ${FIELDNAMES[fieldlooptop]} != "dt" ]; then TEMPNAME=`echo ${FIELDNAMES[fieldlooptop]} | sed "s|timestamp|${BACKTICK}timestamp${BACKTICK}|g;s|row|${BACKTICK}row${BACKTICK}|g"`; echo "cast"$LPAREN"$UNIONALIAS.${TEMPNAME} AS ${FIELDNAMES[fieldlooptop+1]}"$RPAREN" as ${TEMPNAME}""," >> $PRODUCTIONTEMPHQLFILE; fi done echo -e "\n" >> $PRODUCTIONTEMPHQLFILE; # Top Join JOINTOP=" to_date(new.$PARTITIONFIELDDT) AS dt\n \ FROM ${ENVIRONMENT}raw.ampy_$TABLENAME as new\n \ LEFT JOIN old\n \ ON nvl(cast(new.$PRIMARYKEY AS int),0) = nvl(old.$PRIMARYKEY,0)\n" echo -e "$JOINTOP" >> $PRODUCTIONTEMPHQLFILE; # Union All Statements echo -e "UNION ALL\n\n--Old records to retain (no update)\nSELECT\n" >> $PRODUCTIONTEMPHQLFILE; # bottom select list UNIONALIAS="old"; for ((fieldloopbottom=0;fieldloopbottom< ${#FIELDNAMES[@]} ;fieldloopbottom+=2)); do if [ ${FIELDNAMES[fieldloopbottom]} != "dt" ]; then TEMPNAME=`echo ${FIELDNAMES[fieldloopbottom]} | sed "s|timestamp|${BACKTICK}timestamp${BACKTICK}|g;s|row|${BACKTICK}row${BACKTICK}|g"`; echo "cast"$LPAREN"$UNIONALIAS.${TEMPNAME} AS ${FIELDNAMES[fieldloopbottom+1]}"$RPAREN" as ${TEMPNAME}""," >> $PRODUCTIONTEMPHQLFILE; fi done echo -e "\n" >> $PRODUCTIONTEMPHQLFILE; # Bottom Join JOINBOTTOM=" to_date(old.$PARTITIONFIELDDT) AS dt\n\n \ FROM old\n\n \ LEFT JOIN ${ENVIRONMENT}raw.ampy_$TABLENAME as new\n \ ON nvl(cast(new.$PRIMARYKEY AS int),0) = nvl(old.$PRIMARYKEY,0)\n" echo -e "$JOINBOTTOM" >> $PRODUCTIONTEMPHQLFILE; # final clause echo "WHERE new.$PRIMARYKEY IS NULL;" >> $PRODUCTIONTEMPHQLFILE; # ####### save the built file to the ampy etl folders and fix its permissions ########################### hdfs dfs -copyFromLocal -f $PRODUCTIONTEMPHQLFILE $PRODUCTIONHQLFOLDER && hdfs dfs -chmod 775 $PRODUCTIONHQLFILE; # ####################################################################################################### loopcount=`expr $loopcount + 3`; done