import sys
import boto3, os
import requests
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType
from pyspark import SQLContext

def rowExpander(row):
    rowDict = row.asDict()
    valProvince = rowDict.pop('Province/State')
    valCountry = rowDict.pop('Country/Region')
    valLat = rowDict.pop('Lat')
    valLong = rowDict.pop('Long')
    for k in rowDict:
        yield Row(**{'province': valProvince , 'country':valCountry, 'latitude':valLat, 'longitude':valLong, 'date' : k, 'value' : row[k]})

# REPLACE THE VALUES BELOW WITH THE OUTPUT PARAMETERS FROM THE CLOUD FORMATION TEMPLATE BEFORE RUNNING THE GLUE JOB
# Parameter EMRMasterDNSName from Cloud Formation Output
#hive_thrift_server_name='EMRMasterDNSName'
#Parameter S3BucketName from Cloud Formation Output
s3_bucket_name=sys.argv[1]

#args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# Get the data from the public url for downloading to the S3 bucket created by the CloudFormation Template
covid19_confirmed_source = 'https://data.humdata.org/hxlproxy/data/download/time_series_covid19_confirmed_global_narrow.csv?dest=data_edit&filter01=merge&merge-url01=https://docs.google.com/spreadsheets/d/e/2PACX-1vTglKQRXpkKSErDiWG6ycqEth32MY0reMuVGhaslImLjfuLU0EUgyyu2e-3vKDArjqGX7dXEBV8FJ4f/pub%3Fgid%3D1326629740%26single%3Dtrue%26output%3Dcsv&merge-keys01=%23country%2Bname&merge-tags01=%23country%2Bcode,%23region%2Bmain%2Bcode,%23region%2Bsub%2Bcode,%23region%2Bintermediate%2Bcode&filter02=merge&merge-url02=https://docs.google.com/spreadsheets/d/e/2PACX-1vTglKQRXpkKSErDiWG6ycqEth32MY0reMuVGhaslImLjfuLU0EUgyyu2e-3vKDArjqGX7dXEBV8FJ4f/pub%3Fgid%3D398158223%26single%3Dtrue%26output%3Dcsv&merge-keys02=%23adm1%2Bname&merge-tags02=%23country%2Bcode,%23region%2Bmain%2Bcode,%23region%2Bsub%2Bcode,%23region%2Bintermediate%2Bcode&merge-replace02=on&merge-overwrite02=on&filter03=explode&explode-header-att03=date&explode-value-att03=value&filter04=rename&rename-oldtag04=%23affected%2Bdate&rename-newtag04=%23date&rename-header04=Date&filter05=rename&rename-oldtag05=%23affected%2Bvalue&rename-newtag05=%23affected%2Binfected%2Bvalue%2Bnum&rename-header05=Value&filter06=clean&clean-date-tags06=%23date&filter07=sort&sort-tags07=%23date&sort-reverse07=on&filter08=sort&sort-tags08=%23country%2Bname,%23adm1%2Bname&tagger-match-all=on&tagger-default-tag=%23affected%2Blabel&tagger-01-header=province/state&tagger-01-tag=%23adm1%2Bname&tagger-02-header=country/region&tagger-02-tag=%23country%2Bname&tagger-03-header=lat&tagger-03-tag=%23geo%2Blat&tagger-04-header=long&tagger-04-tag=%23geo%2Blon&header-row=1&url=https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv'
covid19_deaths_source = 'https://data.humdata.org/hxlproxy/data/download/time_series_covid19_deaths_global_narrow.csv?dest=data_edit&filter01=merge&merge-url01=https://docs.google.com/spreadsheets/d/e/2PACX-1vTglKQRXpkKSErDiWG6ycqEth32MY0reMuVGhaslImLjfuLU0EUgyyu2e-3vKDArjqGX7dXEBV8FJ4f/pub%3Fgid%3D1326629740%26single%3Dtrue%26output%3Dcsv&merge-keys01=%23country%2Bname&merge-tags01=%23country%2Bcode,%23region%2Bmain%2Bcode,%23region%2Bsub%2Bcode,%23region%2Bintermediate%2Bcode&filter02=merge&merge-url02=https://docs.google.com/spreadsheets/d/e/2PACX-1vTglKQRXpkKSErDiWG6ycqEth32MY0reMuVGhaslImLjfuLU0EUgyyu2e-3vKDArjqGX7dXEBV8FJ4f/pub%3Fgid%3D398158223%26single%3Dtrue%26output%3Dcsv&merge-keys02=%23adm1%2Bname&merge-tags02=%23country%2Bcode,%23region%2Bmain%2Bcode,%23region%2Bsub%2Bcode,%23region%2Bintermediate%2Bcode&merge-replace02=on&merge-overwrite02=on&filter03=explode&explode-header-att03=date&explode-value-att03=value&filter04=rename&rename-oldtag04=%23affected%2Bdate&rename-newtag04=%23date&rename-header04=Date&filter05=rename&rename-oldtag05=%23affected%2Bvalue&rename-newtag05=%23affected%2Binfected%2Bvalue%2Bnum&rename-header05=Value&filter06=clean&clean-date-tags06=%23date&filter07=sort&sort-tags07=%23date&sort-reverse07=on&filter08=sort&sort-tags08=%23country%2Bname,%23adm1%2Bname&tagger-match-all=on&tagger-default-tag=%23affected%2Blabel&tagger-01-header=province/state&tagger-01-tag=%23adm1%2Bname&tagger-02-header=country/region&tagger-02-tag=%23country%2Bname&tagger-03-header=lat&tagger-03-tag=%23geo%2Blat&tagger-04-header=long&tagger-04-tag=%23geo%2Blon&header-row=1&url=https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_deaths_global.csv'
covid19_recovery_source = 'https://data.humdata.org/hxlproxy/data/download/time_series_covid19_recovered_global_narrow.csv?dest=data_edit&filter01=merge&merge-url01=https://docs.google.com/spreadsheets/d/e/2PACX-1vTglKQRXpkKSErDiWG6ycqEth32MY0reMuVGhaslImLjfuLU0EUgyyu2e-3vKDArjqGX7dXEBV8FJ4f/pub%3Fgid%3D1326629740%26single%3Dtrue%26output%3Dcsv&merge-keys01=%23country%2Bname&merge-tags01=%23country%2Bcode,%23region%2Bmain%2Bcode,%23region%2Bsub%2Bcode,%23region%2Bintermediate%2Bcode&filter02=merge&merge-url02=https://docs.google.com/spreadsheets/d/e/2PACX-1vTglKQRXpkKSErDiWG6ycqEth32MY0reMuVGhaslImLjfuLU0EUgyyu2e-3vKDArjqGX7dXEBV8FJ4f/pub%3Fgid%3D398158223%26single%3Dtrue%26output%3Dcsv&merge-keys02=%23adm1%2Bname&merge-tags02=%23country%2Bcode,%23region%2Bmain%2Bcode,%23region%2Bsub%2Bcode,%23region%2Bintermediate%2Bcode&merge-replace02=on&merge-overwrite02=on&filter03=explode&explode-header-att03=date&explode-value-att03=value&filter04=rename&rename-oldtag04=%23affected%2Bdate&rename-newtag04=%23date&rename-header04=Date&filter05=rename&rename-oldtag05=%23affected%2Bvalue&rename-newtag05=%23affected%2Binfected%2Bvalue%2Bnum&rename-header05=Value&filter06=clean&clean-date-tags06=%23date&filter07=sort&sort-tags07=%23date&sort-reverse07=on&filter08=sort&sort-tags08=%23country%2Bname,%23adm1%2Bname&tagger-match-all=on&tagger-default-tag=%23affected%2Blabel&tagger-01-header=province/state&tagger-01-tag=%23adm1%2Bname&tagger-02-header=country/region&tagger-02-tag=%23country%2Bname&tagger-03-header=lat&tagger-03-tag=%23geo%2Blat&tagger-04-header=long&tagger-04-tag=%23geo%2Blon&header-row=1&url=https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_recovered_global.csv'

covid19_confirmed_content = requests.get(covid19_confirmed_source).content
covid19_confirmed_file_handle = open('covid19_confirmed.csv', 'wb')
covid19_confirmed_file_handle.write(covid19_confirmed_content)
covid19_confirmed_file_handle.close()

covid19_deaths_content = requests.get(covid19_deaths_source).content
covid19_deaths_file_handle = open('covid19_deaths.csv', 'wb')
covid19_deaths_file_handle.write(covid19_deaths_content)
covid19_deaths_file_handle.close()

covid19_recovery_content = requests.get(covid19_recovery_source).content
covid19_recovery_file_handle = open('covid19_recovery.csv', 'wb')
covid19_recovery_file_handle.write(covid19_recovery_content)
covid19_recovery_file_handle.close()

boto3.Session() \
    .resource('s3') \
    .Bucket(s3_bucket_name) \
    .Object(os.path.join('covid_data/csv/confirmed_cases', 'covid_confirmed_global_data.csv')) \
    .upload_file('covid19_confirmed.csv')

boto3.Session() \
    .resource('s3') \
    .Bucket(s3_bucket_name) \
    .Object(os.path.join('covid_data/csv/death_cases', 'covid_deaths_global_data.csv')) \
    .upload_file('covid19_deaths.csv')

boto3.Session() \
    .resource('s3') \
    .Bucket(s3_bucket_name) \
    .Object(os.path.join('covid_data/csv/recovered_cases', 'covid_recovered_global_data.csv')) \
    .upload_file('covid19_recovery.csv')

spark = SparkSession.builder \
        .appName("covid19-dataset") \
        .enableHiveSupport() \
        .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(spark)


# Read raw data into DF
covid19ConfirmedRawdDF = spark.read \
					    .csv("s3://"+s3_bucket_name+"/covid_data/csv/confirmed_cases/covid_confirmed_global_data.csv", header=True, mode="DROPMALFORMED") \
                        .withColumnRenamed('Province/State','province') \
                        .withColumnRenamed('Country/Region','country') \
                        .withColumnRenamed('Lat','latitude') \
                        .withColumnRenamed('Long','longitude') \
                        .drop('ISO 3166-1 Alpha 3-Codes', 'Region Code','Sub-region Code','Intermediate Region Code')

# Add an index to skip more than one header line(s)
covid19ConfirmedIndexedDF = covid19ConfirmedRawdDF.withColumn('index', monotonically_increasing_id())

# Filter the DF and drop unwanted columns
covid19ConfirmedFilteredDF = covid19ConfirmedIndexedDF.withColumn("d_latitude", covid19ConfirmedIndexedDF["latitude"].cast(DoubleType())) \
								.drop("latitude") \
                                .withColumnRenamed("d_latitude","latitude") \
                                .withColumn("d_longitude", covid19ConfirmedIndexedDF["longitude"].cast(DoubleType())) \
                                .drop("longitude") \
                                .withColumnRenamed("d_longitude","longitude") \
                                .withColumn("count",covid19ConfirmedIndexedDF['value'].cast(IntegerType()))\
				.drop("value") \
                                .filter(col("index") > 0) \
                                .drop("index")
# Sample DF and Schema
covid19ConfirmedFilteredDF.show(10)
covid19ConfirmedFilteredDF.printSchema()

# Write final results in parquet format
covid19ConfirmedFilteredDF.write \
	.parquet("s3://"+s3_bucket_name+"/covid_data/parquet/covid_confirmed_cases")

#Since the data has date as the columns, transpose that to rows so the data is represented
#df0.show(10)
#newDF0 = spark.createDataFrame(df0.rdd.flatMap(rowExpander))
#newDF0.show(10)

# Read raw data into DF
covid19DeathsRawDF = spark.read \
						.csv("s3://"+s3_bucket_name+"/covid_data/csv/death_cases/covid_deaths_global_data.csv", header=True, mode="DROPMALFORMED") \
						.withColumnRenamed('Province/State','province') \
						.withColumnRenamed('Country/Region','country') \
						.withColumnRenamed('Lat','latitude') \
						.withColumnRenamed('Long','longitude') \
						.drop('ISO 3166-1 Alpha 3-Codes', 'Region Code','Sub-region Code','Intermediate Region Code')

# Add an index to skip more than one header line(s)
covid19DeathsIndexedDF = covid19DeathsRawDF.withColumn('index', monotonically_increasing_id())

# Filter the DF and drop unwanted columns
covid19DeathsFilteredDF = covid19DeathsIndexedDF.withColumn("d_latitude", covid19DeathsIndexedDF["latitude"].cast(DoubleType())) \
							.drop("latitude") \
							.withColumnRenamed("d_latitude","latitude") \
							.withColumn("d_longitude", covid19DeathsIndexedDF["longitude"].cast(DoubleType())) \
							.drop("longitude") \
							.withColumnRenamed("d_longitude","longitude") \
							.withColumn("count",covid19DeathsIndexedDF['value'].cast(IntegerType()))\
							.drop("value") \
							.filter(col("index") > 0) \
							.drop("index")

# Sample DF and Schema
covid19DeathsFilteredDF.show(10)

# Write final results in parquet format
covid19DeathsFilteredDF.write \
	.parquet("s3://"+s3_bucket_name+"/covid_data/parquet/covid_death_cases")

#Since the data has date as the columns, transpose that to rows so the data is represented
#df0.show(10)
#newDF1 = spark.createDataFrame(df1.rdd.flatMap(rowExpander))
##ewDF1.show(10)

# Read raw data into DF
covid19RecoveredRawDF = spark.read \
						.csv("s3://"+s3_bucket_name+"/covid_data/csv/recovered_cases/covid_recovered_global_data.csv", header=True, mode="DROPMALFORMED") \
						.withColumnRenamed('Province/State','province') \
						.withColumnRenamed('Country/Region','country') \
						.withColumnRenamed('Lat','latitude') \
						.withColumnRenamed('Long','longitude') \
						.drop('ISO 3166-1 Alpha 3-Codes', 'Region Code','Sub-region Code','Intermediate Region Code')

# Add an index to skip more than one header line(s)
covid19RecoveredIndexedDF = covid19RecoveredRawDF.withColumn('index', monotonically_increasing_id())

# Filter the DF and drop unwanted columns
covid19RecoveredFilteredDF = covid19RecoveredIndexedDF.withColumn("d_latitude", covid19RecoveredIndexedDF["latitude"].cast(DoubleType())) \
								.drop("latitude") \
								.withColumnRenamed("d_latitude","latitude") \
								.withColumn("d_longitude", covid19RecoveredIndexedDF["longitude"].cast(DoubleType())) \
								.drop("longitude") \
								.withColumnRenamed("d_longitude","longitude") \
								.withColumn("count",covid19RecoveredIndexedDF['value'].cast(IntegerType()))\
                                                                .drop("value") \
								.filter(col("index") > 0) \
								.drop("index")

# Sample DF and Schema
covid19RecoveredFilteredDF.show(10)

# Write final results in parquet format
covid19RecoveredFilteredDF.write \
	.parquet("s3://"+s3_bucket_name+"/covid_data/parquet/covid_recovered_cases")

#Since the data has date as the columns, transpose that to rows so the data is represented
#df0.show(10)
#newDF2 = spark.createDataFrame(df2.rdd.flatMap(rowExpander))
#newDF2.show(10)

# Drop table if already present
droptable1 = 'DROP TABLE IF EXISTS covid_recovered_cases'
droptable2 = 'DROP TABLE IF EXISTS covid_confirmed_cases'
droptable3 = 'DROP TABLE IF EXISTS covid_death_cases'

spark.sql(droptable1)
spark.sql(droptable2)
spark.sql(droptable3)

# DDL for Hive Tables using the parquet data
create_recovered_table = "CREATE EXTERNAL TABLE covid_recovered_cases(province string, country string, date string, latitude double, longitude double, count int)" \
  "ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"\
  "STORED AS INPUTFORMAT" \
  "'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'" \
  "OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'" \
  "LOCATION  's3a://" +s3_bucket_name+ "/covid_data/parquet/covid_recovered_cases'"

create_confirmed_table = "CREATE EXTERNAL TABLE covid_confirmed_cases(province string, country string, date string, latitude double, longitude double, count int)" \
  "ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"\
  "STORED AS INPUTFORMAT" \
  "'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'" \
  "OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'" \
  "LOCATION  's3a://" +s3_bucket_name+ "/covid_data/parquet/covid_confirmed_cases'"

create_death_table = "CREATE EXTERNAL TABLE covid_death_cases(province string, country string, date string, latitude double, longitude double, count int)" \
  "ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"\
  "STORED AS INPUTFORMAT" \
  "'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'" \
  "OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'" \
  "LOCATION  's3a://" +s3_bucket_name+ "/covid_data/parquet/covid_death_cases'"

# Execute create DDL
spark.sql(create_confirmed_table)
spark.sql(create_recovered_table)
spark.sql(create_death_table)
