Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pyarrow-backed DataFrame with read_csv #283

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions gridstatus/caiso.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,9 @@ def get_curtailed_non_operational_generator_report(

# find index of OUTAGE MRID
test_parse = pd.read_excel(
content, usecols="B:M", sheet_name="PREV_DAY_OUTAGES"
content,
usecols="B:M",
sheet_name="PREV_DAY_OUTAGES",
)
first_col = test_parse[test_parse.columns[0]]
outage_mrid_index = first_col[first_col == "OUTAGE MRID"].index[0] + 1
Expand Down Expand Up @@ -947,7 +949,7 @@ def get_curtailed_non_operational_generator_report(
]

assert not df.duplicated(
subset=["Outage MRID", "Curtailment Start Time"]
subset=["Outage MRID", "Curtailment Start Time"],
).any(), "There are still duplicates"

return df
Expand Down Expand Up @@ -1180,7 +1182,7 @@ def _get_historical(file, date, verbose=False):
msg = f"Fetching URL: {url}"
log(msg, verbose)

df = pd.read_csv(url)
df = pd.read_csv(url, engine="pyarrow", dtype_backend="pyarrow")

# sometimes there are extra rows at the end, so this lets us ignore them
df = df.dropna(subset=["Time"])
Expand Down Expand Up @@ -1243,7 +1245,7 @@ def _get_oasis(config, start, end=None, raw_data=False, verbose=False, sleep=5):
# parse and concat all files
dfs = []
for f in z.namelist():
df = pd.read_csv(z.open(f))
df = pd.read_csv(z.open(f), engine="pyarrow", dtype_backend="pyarrow")
dfs.append(df)

df = pd.concat(dfs)
Expand Down
54 changes: 37 additions & 17 deletions gridstatus/ercot.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,13 @@ def _read_html_display(self, url, verbose=False):
msg = f"Fetching {url}"
log(msg, verbose)

dfs = pd.read_html(url, header=0)
dfs = pd.read_html(url, header=0, dtype_backend="pyarrow")
df = dfs[0]

df["Interval End"] = pd.to_datetime(df["Oper Day"]) + (
df["Hour Ending"] / 100
).astype("timedelta64[h]")
df["Interval End"] = pd.to_datetime(df["Oper Day"]) + pd.to_timedelta(
df["Hour Ending"] / 100,
unit="h",
)
df["Interval End"] = df["Interval End"].dt.tz_localize(
self.default_timezone,
)
Expand Down Expand Up @@ -462,8 +463,8 @@ def _get_todays_outlook_non_forecast(self, date, verbose=False):

data["Interval End"] = (
date
+ data["hourEnding"].astype("timedelta64[h]")
+ data["interval"].astype("timedelta64[m]")
+ pd.to_timedelta(data["hourEnding"], unit="h")
+ pd.to_timedelta(data["interval"], unit="m")
)

data["Interval End"] = data["Interval End"].dt.tz_localize(
Expand Down Expand Up @@ -1154,9 +1155,13 @@ def _handle_60_day_sced_disclosure(self, z, process=False, verbose=False):
assert gen_resource_file, "Could not find gen resource file"
assert smne_file, "Could not find smne file"

load_resource = pd.read_csv(z.open(load_resource_file))
gen_resource = pd.read_csv(z.open(gen_resource_file))
smne = pd.read_csv(z.open(smne_file))
load_resource = pd.read_csv(
z.open(load_resource_file), engine="pyarrow", dtype_backend="pyarrow"
)
gen_resource = pd.read_csv(
z.open(gen_resource_file), engine="pyarrow", dtype_backend="pyarrow"
)
smne = pd.read_csv(z.open(smne_file), engine="pyarrow", dtype_backend="pyarrow")

def handle_time(df, time_col, is_interval_end=False):
df[time_col] = pd.to_datetime(df[time_col])
Expand Down Expand Up @@ -1274,7 +1279,7 @@ def _handle_60_day_dam_disclosure(self, z, process=False, verbose=False):
data = {}

for key, file in files.items():
doc = pd.read_csv(z.open(file))
doc = pd.read_csv(z.open(file), engine="pyarrow", dtype_backend="pyarrow")
# weird that these files dont have this column like all other eroct files
# add so we can parse
doc["DSTFlag"] = "N"
Expand Down Expand Up @@ -1738,7 +1743,9 @@ def _handle_as_reports_file(self, file_path, verbose):
if as_name in ["ECRSM", "ECRSS"] and cleared not in z.namelist():
continue

df_cleared = pd.read_csv(z.open(cleared))
df_cleared = pd.read_csv(
z.open(cleared), engine="pyarrow", dtype_backend="pyarrow"
)
all_dfs.append(df_cleared)

for as_name in self_arranged_products:
Expand All @@ -1748,7 +1755,9 @@ def _handle_as_reports_file(self, file_path, verbose):
if as_name in ["ECRSM", "ECRSS"] and self_arranged not in z.namelist():
continue

df_self_arranged = pd.read_csv(z.open(self_arranged))
df_self_arranged = pd.read_csv(
z.open(self_arranged), engine="pyarrow", dtype_backend="pyarrow"
)
all_dfs.append(df_self_arranged)

def _make_bid_curve(df):
Expand All @@ -1764,7 +1773,9 @@ def _make_bid_curve(df):
if as_name in ["ECRSM", "ECRSS"] and offers not in z.namelist():
continue

df_offers = pd.read_csv(z.open(offers))
df_offers = pd.read_csv(
z.open(offers), engine="pyarrow", dtype_backend="pyarrow"
)
name = f"Bid Curve - {as_name}"
if df_offers.empty:
# use last df to get the index
Expand Down Expand Up @@ -1830,7 +1841,12 @@ def _handle_sced_system_lambda(self, docs, verbose):
log("Reading SCED System Lambda files", verbose)

df = pd.concat(
[pd.read_csv(i.url, compression="zip") for i in tqdm.tqdm(docs)],
[
pd.read_csv(
i.url, engine="pyarrow", dtype_backend="pyarrow", compression="zip"
)
for i in tqdm.tqdm(docs)
],
)

df["SCED Time Stamp"] = pd.to_datetime(df["SCEDTimeStamp"]).dt.tz_localize(
Expand Down Expand Up @@ -2113,12 +2129,16 @@ def _get_settlement_point_mapping(self, verbose=False):
settlement_points_file = [
name for name in names if "Settlement_Points" in name
][0]
df = pd.read_csv(z.open(settlement_points_file))
df = pd.read_csv(
z.open(settlement_points_file), engine="pyarrow", dtype_backend="pyarrow"
)
return df

def read_doc(self, doc, verbose=False):
log(f"Reading {doc.url}", verbose)
df = pd.read_csv(doc.url, compression="zip")
df = pd.read_csv(
doc.url, engine="pyarrow", compression="zip", dtype_backend="pyarrow"
)
return self.parse_doc(df, verbose=verbose)

def parse_doc(self, doc, verbose=False):
Expand Down Expand Up @@ -2154,7 +2174,7 @@ def parse_doc(self, doc, verbose=False):

doc["Interval Start"] = (
pd.to_datetime(doc["DeliveryDate"])
+ doc["HourBeginning"].astype("timedelta64[h]")
+ pd.to_timedelta(doc["HourBeginning"], unit="h")
+ ((doc["DeliveryInterval"] - 1) * interval_length)
)

Expand Down
10 changes: 7 additions & 3 deletions gridstatus/isone.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ def get_lmp(
u,
skiprows=[0, 1, 2, 3, 5],
skipfooter=1,
engine="python",
engine="pyarrow",
dtype_backend="pyarrow",
),
)

Expand Down Expand Up @@ -521,7 +522,9 @@ def get_interconnection_queue(self, verbose=False):
log(msg, verbose)

r = requests.get("https://irtt.iso-ne.com/reports/external")
queue = pd.read_html(r.text, attrs={"id": "publicqueue"})[0]
queue = pd.read_html(
r.text, attrs={"id": "publicqueue"}, dtype_backend="pyarrow"
)[0]

# only keep generator interconnection requests
queue["Type"] = queue["Type"].map(
Expand Down Expand Up @@ -687,7 +690,8 @@ def _make_request(url, skiprows, verbose):
io.StringIO(response.content.decode("utf8")),
skiprows=skiprows,
skipfooter=1,
engine="python",
engine="pyarrow",
dtype_backend="pyarrow",
).drop_duplicates()
return df

Expand Down
10 changes: 7 additions & 3 deletions gridstatus/miso.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def get_lmp(self, date, market: str, locations: list = "ALL", verbose=False):
url = today_url

log(f"Downloading LMP data from {url}", verbose)
data = pd.read_csv(url)
data = pd.read_csv(url, engine="pyarrow", dtype_backend="pyarrow")

data["Interval Start"] = (
pd.to_datetime(data["INTERVAL"])
Expand All @@ -200,7 +200,9 @@ def get_lmp(self, date, market: str, locations: list = "ALL", verbose=False):
today = utils._handle_date("today", self.default_timezone)
url = f"https://docs.misoenergy.org/marketreports/{today.strftime('%Y%m%d')}_da_expost_lmp.csv" # noqa
log(f"Downloading LMP data from {url}", verbose)
today_dam_data = pd.read_csv(url, skiprows=4)
today_dam_data = pd.read_csv(
url, skiprows=4, engine="pyarrow", dtype_backend="pyarrow"
)
node_to_type = today_dam_data[["Node", "Type"]].drop_duplicates()
data = data.merge(
node_to_type,
Expand All @@ -214,7 +216,9 @@ def get_lmp(self, date, market: str, locations: list = "ALL", verbose=False):
elif market == Markets.DAY_AHEAD_HOURLY:
url = f"https://docs.misoenergy.org/marketreports/{date.strftime('%Y%m%d')}_da_expost_lmp.csv" # noqa
log(f"Downloading LMP data from {url}", verbose)
raw_data = pd.read_csv(url, skiprows=4)
raw_data = pd.read_csv(
url, skiprows=4, engine="pyarrow", dtype_backend="pyarrow"
)
data_melted = raw_data.melt(
id_vars=["Node", "Type", "Value"],
value_vars=[col for col in raw_data.columns if col.startswith("HE")],
Expand Down
10 changes: 6 additions & 4 deletions gridstatus/nyiso.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ def get_generators(self, verbose=False):
msg = f"Requesting {url}"
log(msg, verbose)

df = pd.read_csv(url)
df = pd.read_csv(url, engine="pyarrow", dtype_backend="pyarrow")

# need to be updated once a year. approximately around end of april
# find it here: https://www.nyiso.com/gold-book-resources
Expand Down Expand Up @@ -662,7 +662,7 @@ def get_loads(self, verbose=False):
msg = f"Requesting {url}"
log(msg, verbose)

df = pd.read_csv(url)
df = pd.read_csv(url, engine="pyarrow", dtype_backend="pyarrow")

return df

Expand Down Expand Up @@ -734,7 +734,7 @@ def _download_nyiso_archive(
msg = f"Requesting {csv_url}"
log(msg, verbose)

df = pd.read_csv(csv_url)
df = pd.read_csv(csv_url, engine="pyarrow", dtype_backend="pyarrow")
df = _handle_time(df, dataset_name)
df["File Date"] = date.normalize()
else:
Expand Down Expand Up @@ -769,7 +769,9 @@ def _download_nyiso_archive(
msg = f"{csv_filename} not found in {zip_url}"
log(msg, verbose)
continue
df = pd.read_csv(z.open(csv_filename))
df = pd.read_csv(
z.open(csv_filename), engine="pyarrow", dtype_backend="pyarrow"
)
df["File Date"] = d.normalize()

df = _handle_time(df, dataset_name)
Expand Down
14 changes: 7 additions & 7 deletions gridstatus/spp.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def get_fuel_mix(self, date, detailed=False, verbose=False):
raise NotSupported

url = f"{FILE_BROWSER_DOWNLOAD_URL}/generation-mix-historical?path=/GenMix2Hour.csv" # noqa
df_raw = pd.read_csv(url)
df_raw = pd.read_csv(url, engine="pyarrow", dtype_backend="pyarrow")
historical_mix = process_gen_mix(df_raw, detailed=detailed)

historical_mix = historical_mix.drop(
Expand Down Expand Up @@ -286,7 +286,7 @@ def get_capacity_of_generation_on_outage(self, date, end=None, verbose=False):
msg = f"Downloading {url}"
log(msg, verbose)

df = pd.read_csv(url)
df = pd.read_csv(url, engine="pyarrow", dtype_backend="pyarrow")

return self._process_capacity_of_generation_on_outage(df, publish_time=date)

Expand Down Expand Up @@ -366,7 +366,7 @@ def get_ver_curtailments(self, date, end=None, verbose=False):

msg = f"Downloading {url}"
log(msg, verbose)
df = pd.read_csv(url)
df = pd.read_csv(url, engine="pyarrow", dtype_backend="pyarrow")

return self._process_ver_curtailments(df)

Expand Down Expand Up @@ -440,7 +440,7 @@ def get_interconnection_queue(self, verbose=False):
msg = f"Getting interconnection queue from {url}"
log(msg, verbose)

queue = pd.read_csv(url, skiprows=1)
queue = pd.read_csv(url, skiprows=1, engine="pyarrow", dtype_backend="pyarrow")

queue["Status (Original)"] = queue["Status"]
completed_val = InterconnectionQueueStatus.COMPLETED.value
Expand Down Expand Up @@ -607,7 +607,7 @@ def _get_dam_lmp(
):
url = f"{FILE_BROWSER_DOWNLOAD_URL}/{FS_DAM_LMP_BY_LOCATION}?path=/{date.strftime('%Y')}/{date.strftime('%m')}/By_Day/DA-LMP-SL-{date.strftime('%Y%m%d')}0100.csv" # noqa
log(f"Downloading {url}", verbose=verbose)
df = pd.read_csv(url)
df = pd.read_csv(url, engine="pyarrow", dtype_backend="pyarrow")
return df

def _finalize_spp_df(self, df, market, location_type, verbose=False):
Expand Down Expand Up @@ -698,7 +698,7 @@ def get_lmp_real_time_weis(self, date, verbose=False):
url = f"{FILE_BROWSER_DOWNLOAD_URL}/lmp-by-settlement-location-weis?path=/{date.strftime('%Y')}/{date.strftime('%m')}/By_Day/WEIS-RTBM-LMP-DAILY-SL-{date.strftime('%Y%m%d')}.csv" # noqa
msg = f"Downloading {url}"
log(msg, verbose)
df = pd.read_csv(url)
df = pd.read_csv(url, engine="pyarrow", dtype_backend="pyarrow")

return self._process_lmp_real_time_weis(df)

Expand Down Expand Up @@ -760,7 +760,7 @@ def _fetch_and_concat_csvs(self, urls: list, verbose: bool = False):
for url in tqdm.tqdm(urls):
msg = f"Fetching {url}"
log(msg, verbose)
df = pd.read_csv(url)
df = pd.read_csv(url, engine="pyarrow", dtype_backend="pyarrow")
all_dfs.append(df)
return pd.concat(all_dfs)

Expand Down
6 changes: 4 additions & 2 deletions gridstatus/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ def download_csvs_from_zip_url(url, process_csv=None, verbose=False):
all_dfs = []
for f in z.filelist:
if f.filename.endswith(".csv"):
df = pd.read_csv(z.open(f.filename))
df = pd.read_csv(
z.open(f.filename), engine="pyarrow", dtype_backend="pyarrow"
)
if process_csv:
df = process_csv(df, f.filename)
all_dfs.append(df)
Expand Down Expand Up @@ -276,7 +278,7 @@ def load_folder(path, time_zone=None, verbose=True):

dfs = []
for f in tqdm.tqdm(all_files, disable=not verbose):
df = pd.read_csv(f, parse_dates=True)
df = pd.read_csv(f, parse_dates=True, engine="pyarrow", dtype_backend="pyarrow")
dfs.append(df)

data = pd.concat(dfs).reset_index(drop=True)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ requires-python = ">=3.8,<4"
dependencies = [
"requests >= 2.28.1",
"pandas >= 1.3.0",
"pyarrow >= 12.0.0",
"beautifulsoup4 >= 4.8.13",
"tabulate >= 0.8.10",
"tqdm >= 4.64.1",
Expand Down
Loading