-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmageCabTransformation
More file actions
161 lines (142 loc) · 6.15 KB
/
mageCabTransformation
File metadata and controls
161 lines (142 loc) · 6.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import pandas as pd
if "transformer" not in globals():
from mage_ai.data_preparation.decorators import transformer
if "test" not in globals():
from mage_ai.data_preparation.decorators import test
@transformer
def transform(data, *args, **kwargs):
data["tpep_pickup_datetime"] = pd.to_datetime(data["tpep_pickup_datetime"])
data["tpep_dropoff_datetime"] = pd.to_datetime(data["tpep_dropoff_datetime"])
data = data.drop_duplicates().reset_index(drop=True)
data["trip_id"] = data.index
datetime_dim = data[["tpep_pickup_datetime", "tpep_dropoff_datetime"]].reset_index(
drop=True
)
datetime_dim["tpep_pickup_datetime"] = datetime_dim["tpep_pickup_datetime"]
datetime_dim["pick_hour"] = datetime_dim["tpep_pickup_datetime"].dt.hour
datetime_dim["pick_day"] = datetime_dim["tpep_pickup_datetime"].dt.day
datetime_dim["pick_month"] = datetime_dim["tpep_pickup_datetime"].dt.month
datetime_dim["pick_year"] = datetime_dim["tpep_pickup_datetime"].dt.year
datetime_dim["pick_weekday"] = datetime_dim["tpep_pickup_datetime"].dt.weekday
datetime_dim["tpep_dropoff_datetime"] = datetime_dim["tpep_dropoff_datetime"]
datetime_dim["drop_hour"] = datetime_dim["tpep_dropoff_datetime"].dt.hour
datetime_dim["drop_day"] = datetime_dim["tpep_dropoff_datetime"].dt.day
datetime_dim["drop_month"] = datetime_dim["tpep_dropoff_datetime"].dt.month
datetime_dim["drop_year"] = datetime_dim["tpep_dropoff_datetime"].dt.year
datetime_dim["drop_weekday"] = datetime_dim["tpep_dropoff_datetime"].dt.weekday
datetime_dim["datetime_id"] = datetime_dim.index
# datetime_dim = datetime_dim.rename(columns={'tpep_pickup_datetime': 'datetime_id'}).reset_index(drop=True)
datetime_dim = datetime_dim[
[
"datetime_id",
"tpep_pickup_datetime",
"pick_hour",
"pick_day",
"pick_month",
"pick_year",
"pick_weekday",
"tpep_dropoff_datetime",
"drop_hour",
"drop_day",
"drop_month",
"drop_year",
"drop_weekday",
]
]
passenger_count_dim = data[["passenger_count"]].reset_index(drop=True)
passenger_count_dim["passenger_count_id"] = passenger_count_dim.index
passenger_count_dim = passenger_count_dim[["passenger_count_id", "passenger_count"]]
trip_distance_dim = data[["trip_distance"]].reset_index(drop=True)
trip_distance_dim["trip_distance_id"] = trip_distance_dim.index
trip_distance_dim = trip_distance_dim[["trip_distance_id", "trip_distance"]]
rate_code_type = {
1: "Standard rate",
2: "JFK",
3: "Newark",
4: "Nassau or Westchester",
5: "Negotiated fare",
6: "Group ride",
}
rate_code_dim = data[["RatecodeID"]].reset_index(drop=True)
rate_code_dim["rate_code_id"] = rate_code_dim.index
rate_code_dim["rate_code_name"] = rate_code_dim["RatecodeID"].map(rate_code_type)
rate_code_dim = rate_code_dim[["rate_code_id", "RatecodeID", "rate_code_name"]]
pickup_location_dim = data[["pickup_longitude", "pickup_latitude"]].reset_index(
drop=True
)
pickup_location_dim["pickup_location_id"] = pickup_location_dim.index
pickup_location_dim = pickup_location_dim[
["pickup_location_id", "pickup_latitude", "pickup_longitude"]
]
dropoff_location_dim = data[["dropoff_longitude", "dropoff_latitude"]].reset_index(
drop=True
)
dropoff_location_dim["dropoff_location_id"] = dropoff_location_dim.index
dropoff_location_dim = dropoff_location_dim[
["dropoff_location_id", "dropoff_latitude", "dropoff_longitude"]
]
payment_type_name = {
1: "Credit card",
2: "Cash",
3: "No charge",
4: "Dispute",
5: "Unknown",
6: "Voided trip",
}
payment_type_dim = data[["payment_type"]].reset_index(drop=True)
payment_type_dim["payment_type_id"] = payment_type_dim.index
payment_type_dim["payment_type_name"] = payment_type_dim["payment_type"].map(
payment_type_name
)
payment_type_dim = payment_type_dim[
["payment_type_id", "payment_type", "payment_type_name"]
]
fact_table = (
data.merge(
passenger_count_dim, left_on="trip_id", right_on="passenger_count_id"
)
.merge(trip_distance_dim, left_on="trip_id", right_on="trip_distance_id")
.merge(rate_code_dim, left_on="trip_id", right_on="rate_code_id")
.merge(pickup_location_dim, left_on="trip_id", right_on="pickup_location_id")
.merge(dropoff_location_dim, left_on="trip_id", right_on="dropoff_location_id")
.merge(datetime_dim, left_on="trip_id", right_on="datetime_id")
.merge(payment_type_dim, left_on="trip_id", right_on="payment_type_id")[
[
"trip_id",
"VendorID",
"datetime_id",
"passenger_count_id",
"trip_distance_id",
"rate_code_id",
"store_and_fwd_flag",
"pickup_location_id",
"dropoff_location_id",
"payment_type_id",
"fare_amount",
"extra",
"mta_tax",
"tip_amount",
"tolls_amount",
"improvement_surcharge",
"total_amount",
]
]
)
final_data = {
"datetime_dim": datetime_dim.to_dict(orient="dict"),
"passenger_count_dim": passenger_count_dim.to_dict(orient="dict"),
"trip_distance_dim": trip_distance_dim.to_dict(orient="dict"),
"rate_code_dim": rate_code_dim.to_dict(orient="dict"),
"pickup_location_dim": pickup_location_dim.to_dict(orient="dict"),
"dropoff_location_dim": dropoff_location_dim.to_dict(orient="dict"),
"payment_type_dim": payment_type_dim.to_dict(orient="dict"),
"fact_table": fact_table.to_dict(orient="dict"),
}
# something = pd.DataFrame(final_data["fact_table"])
return final_data
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, "The output is undefined"