"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.emr_containers_airflow_plugin import EmrContainersStartJobRun
from airflow.sensors.emr_containers_airflow_plugin import EmrContainersJobRunSensor
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from io import BytesIO
import boto3, zipfile, gzip, random, string

afbucket='AIRFLOW_BUCKET'
emr_virtual_cluster_id='VIRTUAL_CLUSTER_ID'
emr_execution_role_arn='EMR_EXECUTION_ROLE_ARN'
YR='2020'

bucket=afbucket
SRC_BUCKET = 'tripdata'
SRC_KEY = '-citibike-tripdata.csv.zip'
DEST_BUCKET = bucket
DEST_KEY= '-citibike-tripdata.csv'

now = datetime.now()

def find_max_month():
    conn = boto3.client('s3')  # again assumes boto.cfg setup, assume AWS S3
    mo=0
    for key in conn.list_objects(Bucket=SRC_BUCKET)['Contents']:
       if 'JC' not in key['Key'] and YR in key['Key']:
         mo=mo+1
    print('returning max month {}'.format(mo))
    return mo

def copy_and_unzip_s3(**context):
    s3_resource = boto3.resource('s3')
    zip_obj = s3_resource.Object(bucket_name=context['bucket'], key=context['key'])
    buffer = BytesIO(zip_obj.get()["Body"].read())
    z = zipfile.ZipFile(buffer)
    print('downloaded zip {}, zipObj {}'.format(z, zipfile))
    for filename in z.namelist():
        if filename.startswith("__") :
          continue
        file_info = z.getinfo(filename)
        print('interating over zip {}, zipObj {}'.format(filename, file_info))
        try:
            response = s3_resource.meta.client.upload_fileobj(
                z.open(filename),
                Bucket=context['destbucket'],
                Key=context['destkey']
            )
            print('uploaded to s3 {}'.format(filename))
        except Exception as e:
            print(e)

def list_bucket(**context):
    conn = boto3.client('s3')  # again assumes boto.cfg setup, assume AWS S3
    for key in conn.list_objects(Bucket=context['destbucket'])['Contents']:
      if 'csv.gz' in key['Key']:
        print(key['Key'])

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=1),
}

# [START howto_operator_emr_containers_start_job_run]
JOB_DRIVER = {"sparkSubmitJobDriver": {
    "entryPoint": "s3://" + afbucket + "/citibike-spark-all.py",
    "entryPointArguments": [bucket],
    "sparkSubmitParameters": "--conf spark.executor.instances=3 --conf "
                             "spark.executor.memory=4G --conf spark.driver.memory=2G --conf spark.executor.cores=2 "
                             "--conf spark.sql.shuffle.partitions=60 --conf spark.dynamicAllocation.enabled=false"}}

CONFIGURATION_OVERRIDES = {
    "monitoringConfiguration": {
        "cloudWatchMonitoringConfiguration": {
            "logGroupName": "/emr-containers/jobs",
            "logStreamNamePrefix": "blog"
        },
        "persistentAppUI": "ENABLED",
        "s3MonitoringConfiguration": {
            "logUri": "s3://" + afbucket + "/joblogs"
        }
    }
}
# [END howto_operator_emr_containers_start_job_run]

with DAG(
    dag_id='Citibike_Ridership_Analytics',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    start_date=datetime(now.year,now.month,now.day,now.hour),
    schedule_interval=None,
    tags=['S3','Citibike','EMR on EKS','Spark'],
) as dag:

  start = DummyOperator(
    task_id='start',
    dag=dag)

  listBucket = PythonOperator(
     task_id='list_transformed_files',
     python_callable=list_bucket,
     op_kwargs={'destbucket': DEST_BUCKET},
     dag=dag)

  for i in range(1,find_max_month()+1):
   NEW_SRC_KEY=YR+str(i).zfill(2)+SRC_KEY 
   NEW_DEST_KEY='citibike/csv/' + YR + str(i).zfill(2) + DEST_KEY 
   copyAndTransformS3File = PythonOperator(
       task_id='copy_and_unzip_s3_' + str(i).zfill(2),
       python_callable=copy_and_unzip_s3,
       op_kwargs={'bucket': SRC_BUCKET, 'key': NEW_SRC_KEY, 'destbucket': DEST_BUCKET, 'destkey': NEW_DEST_KEY},
       dag=dag)

   start >> copyAndTransformS3File >> listBucket

  #[START howto_operator_emr_containers_start_job_run_tasks]
   start_job_run = EmrContainersStartJobRun(
        task_id='start_citibike_ridership_analytics',
        name='citibike_analytics_run',
        virtual_cluster_id=emr_virtual_cluster_id,
        client_token=''.join(random.choice(string.digits) for _ in range(10)),
        execution_role_arn=emr_execution_role_arn,
        release_label='emr-6.2.0-latest',
        job_driver=JOB_DRIVER,
        configuration_overrides=CONFIGURATION_OVERRIDES,
        aws_conn_id='aws_default',
   )

   job_sensor = EmrContainersJobRunSensor(
        task_id='check_job_status',
        id="{{ task_instance.xcom_pull(task_ids='start_citibike_ridership_analytics', key='return_value') }}",
        virtual_cluster_id="{{ task_instance.xcom_pull(task_ids='start_citibike_ridership_analytics', key='virtual_cluster_id') }}",
        aws_conn_id='aws_default',
   )

   listBucket >> start_job_run >> job_sensor
