user()
bucket()
long_trips()
manhattan_trips()
weighted_profit()
final_output()
load_data()
main()
sc
#### DO NOT CHANGE ANYTHING IN THIS CELL ####
from pyspark.sql.functions import col
def load_data(size='small'):
# Loads the data for this question. Do not change this function.
# This function should only be called with the parameter 'small' or 'large'
if size != 'small' and size != 'large':
print("Invalid size parameter provided. Use only 'small' or 'large'.")
return
input_bucket = "s3://lab11-janedoe3"
# Load Trip Data
trip_path = '/'+size+'/yellow_tripdata*'
trips = spark.read.csv(input_bucket + trip_path, header=True, inferSchema=True)
print("Trip Count: ",trips.count()) # Prints # of trips (# of records, as each record is one trip)
# Load Lookup Data
lookup_path = '/'+size+'/taxi*'
lookup = spark.read.csv(input_bucket + lookup_path, header=True, inferSchema=True)
return trips, lookup
def main(size, bucket):
# Runs your functions implemented above.
print(user())
trips, lookup = load_data(size=size)
trips = long_trips(trips)
mtrips = manhattan_trips(trips, lookup)
wp = weighted_profit(trips, mtrips)
final = final_output(wp,lookup)
# Outputs the results for you to visually see
final.show()
# Writes out as a CSV to your bucket.
final.write.csv(bucket)
user() function¶This function should return your username, eg: janedoe3
def user():
# Returns a string consisting of your username.
return 'janedoe3'
long_trips() function¶This function filters trips to keep only trips longer than 2 miles.
def long_trips(trips):
# Returns a Dataframe with Schema the same as :trips:
pass
manhattan_trips() function¶This function determines the top 20 locations with a DOLocationID in manhattan by passenger_count (pcount).
Example output formatting:
+--------------+--------+
| DOLocationID | pcount |
+--------------+--------+
| 5| 15|
| 16| 12|
+--------------+--------+
def manhattan_trips(trips, lookup):
# Returns a Dataframe with Schema: DOLocationID, pcount
pass
weighted_profit() function¶This function should determine the average total_amount, the total count of trips, and the total count of trips ending in the top 20 destinations and return the weighted_profit as discussed in the homework document.
Example output formatting:
+--------------+-------------------+
| PULocationID | weighted_profit |
+--------------+-------------------+
| 18| 33.784444421924436|
| 12| 21.124577637149223|
+--------------+-------------------+
def weighted_profit(trips, mtrips):
# Returns a Dataframe with Schema: PULocationID, weighted_profit
pass
final_output() function¶This function will take the results of weighted_profit, links it to the borough and zone and returns the top 20 locations with the highest weighted_profit.
Example output formatting:
+------------+---------+-------------------+
| Zone | Borough | weighted_profit |
+----------------------+-------------------+
| JFK Airport| Queens| 16.95897820117925|
| Jamaica| Queens| 14.879835188762488|
+------------+---------+-------------------+
def final_output(calc, lookup):
# Returns a Dataframe with Schema: Zone, Borough, weighted_profit
pass
Update the below cell with the address to your bucket, then run the below cell to run your code to store the results in S3.
When you have confirmed the results of the small dataset, run it again using the large dataset. Your output file will appear ina folder in your s3 bucket called YOUROUTPUT.csv as a csv file with a name something like part-0000-4d992f7a-0ad3-48f8-8c72-0022984e4b50-c000.csv. Download this file and rename it to output.csv for submission. Do not make any other changes to the file.
bucket = 's3://lab11-janedoe3/output-small'
main('small',bucket)
# main('large', bucket)