DataLoader Examples
[1]:
import os
import re
import numpy as np
import pandas as pd
import xarray as xr
from GPSat.dataloader import DataLoader
from GPSat import get_data_path
from GPSat.utils import cprint
pd.set_option("display.max_columns", 200)
load method
read from csv
[2]:
cprint("-"*20, "BOLD")
cprint("loading individual csv files", "BOLD")
# load data, engine to use is determined by file name
a = DataLoader.load(source=get_data_path("example", "A_RAW.csv"))
print(a.head(2))
# specify engine, in this case a panda's read_* method
b = DataLoader.load(source=get_data_path("example", "B_RAW.csv"),
engine="read_csv")
# provide additional arguments to read_csv
c = DataLoader.load(source=get_data_path("example", "C_RAW.csv"),
engine="read_csv",
source_kwargs={"sep": ','})
--------------------
loading individual csv files
'data_select': 0.227 seconds
'load': 0.228 seconds
lon lat datetime z
0 -57.190860 65.647048 2020-03-01 00:03:32.948838 0.0330
1 -57.224372 65.679991 2020-03-01 00:03:36.667280 0.1057
'data_select': 0.197 seconds
'load': 0.198 seconds
'data_select': 0.335 seconds
'load': 0.336 seconds
load method con’t
save tab seperated file
[3]:
cprint("-"*20, "BOLD")
cprint("loading from tab separated file", "BOLD")
# (store and) read from tab seperated
tsv_file = get_data_path("example", "tmp.tsv")
c.to_csv(tsv_file, sep="\t", index=False)
# load tsv file, providing additional (source keyword) arguments
# - which will be passed into pd.read_csv
_ = DataLoader.load(source=tsv_file,
engine="read_csv",
source_kwargs={"sep": "\t", "keep_default_na": True})
pd.testing.assert_frame_equal(c, _)
--------------------
loading from tab separated file
'data_select': 0.300 seconds
'load': 0.300 seconds
read in multiple files in a folder
identify files using a regular expression apply col_funcs after loaded into memory - see below for details on col_funcs
[4]:
col_funcs = {
"source": {
"func": lambda x: re.sub('_RAW.*$', '', os.path.basename(x)),
"filename_as_arg": True
},
"datetime": {
"func": lambda x: x.astype('datetime64[ns]'),
"col_args": "datetime"
}
}
# can also supply a list of sub-directories (sub_dirs) to look instead of file_dirs
_ = DataLoader.read_flat_files(file_dirs=get_data_path("example"),
file_regex="_RAW\.csv$",
col_funcs=col_funcs)
cprint("head:", "BOLD")
print(_.head(2))
cprint("dtypes:", "BOLD")
print(_.dtypes)
----------------------------------------------------------------------------------------------------
reading files from:
/home/runner/work/GPSat/GPSat/data/example/
that match regular expression: _RAW\.csv$
'read_from_multiple_files': 0.796 seconds
head:
lon lat datetime z source
0 -123.678061 81.395356 2020-03-01 00:00:00.398736 0.5537 B
1 -123.717808 81.394936 2020-03-01 00:00:02.014304 0.2966 B
dtypes:
lon float64
lat float64
datetime datetime64[ns]
z float64
source object
dtype: object
save and read from parquet file
[5]:
cprint("-"*20, "BOLD")
cprint("loading from parquet file", "BOLD")
# store as h5 file - to demonstrate reading in
parq_tmp = get_data_path("example", "tmp.parquet")
# NOTE: fastparquet allows for appending, alternative is pyarrow (or 'auto')
_.to_parquet(parq_tmp, engine="fastparquet")
# read data from parquet file - using the pyarrow engine
df = DataLoader.load(source=parq_tmp)
pd.testing.assert_frame_equal(df, _)
--------------------
loading from parquet file
'data_select': 0.079 seconds
'load': 0.079 seconds
save and read from hdf5 file
[6]:
cprint("-"*20, "BOLD")
cprint("loading from hdf5 file", "BOLD")
# store as h5 file - to demonstrate reading in
hdf5_tmp = get_data_path("example", "tmp.h5")
hdf5_table = "data"
with pd.HDFStore(hdf5_tmp, mode="w") as store:
# setting data_columns = True so will be searchable
store.append(key=hdf5_table, value=_, data_columns=True)
# read data from table in hdf5
df = DataLoader.load(source=hdf5_tmp,
table=hdf5_table)
pd.testing.assert_frame_equal(df, _)
--------------------
loading from hdf5 file
closing
'data_select': 0.397 seconds
'load': 0.398 seconds
save and read from netcdf file
[7]:
cprint("-"*20, "BOLD")
cprint("loading from netcdf file", "BOLD")
# first - save to file, using multi index for dimensions
tmp_ = _.set_index(['datetime', 'source'])
ds = xr.Dataset.from_dataframe(tmp_)
netcdf_tmp = get_data_path("example", "tmp.nc")
ds.to_netcdf(path=netcdf_tmp)
# read data from netcdf file
# NOTE: this will comeback with a multi index - can reset with reset_index=True
nc = DataLoader.load(source=netcdf_tmp)
# netcdf will have nans for missing values
# - netcdf effectively stores values in n-d array with the
# - dimensions determined by an index when converting from DataFrame
# nc.dropna(inplace=True)
# sort indices (and data) in the same way
tmp_.sort_index(inplace=True)
nc.sort_index(inplace=True)
# ensure columns are in same order - might not be needed
nc = nc[tmp_.columns]
pd.testing.assert_frame_equal(tmp_, nc)
--------------------
loading from netcdf file
'data_select': 0.154 seconds
'load': 0.200 seconds
use ‘where’ to select subset without having to read entirely into memory
this is accomplished by using a ‘where dict’, containing the following keys - ‘col’ : the column of the data used for selection - ‘comp’: the comparison to used, e.g. “>”, “>=”, “==”, “!=”, “<=”, “<” - ‘val’ : value being compared to column values
where using hdf5
[8]:
a = _.loc[_['source'] == "A"].copy(True)
cprint("-"*20, "BOLD")
cprint("applying 'where' dictionaries - selecting data at source", "BOLD")
df = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
where={"col": "source", "comp": "==", "val": "A"})
pd.testing.assert_frame_equal(a, df)
# hdf5 allows for a list of values to be provided
df = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
where={"col": "source", "comp": "==", "val": ["A", "B"]}
)
# unique doesn't sort?
np.testing.assert_array_equal(np.sort(df['source'].unique()),
np.array(['A', 'B'], dtype=object))
# multiple 'where dicts' can be combined in a list
# - they will be combined with an AND operation
df = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
where=[
{"col": "source", "comp": "==", "val": "A"},
{"col": "lat", "comp": ">=", "val": 65.0}
]
)
assert df['lat'].min() >= 65.0
pd.testing.assert_frame_equal(a.loc[a['lat'] >= 65.0], df)
--------------------
applying 'where' dictionaries - selecting data at source
closing
'data_select': 0.280 seconds
'load': 0.281 seconds
closing
'data_select': 0.499 seconds
'load': 0.500 seconds
closing
'data_select': 0.348 seconds
'load': 0.349 seconds
where using netcdf files
[9]:
# apply where_dict to netcdf file - must be for the index, in thise case "source" or "datetime"
nc = DataLoader.load(source=netcdf_tmp,
where=[
{"col": "source", "comp": "==", "val": "A"}
],
reset_index=True
)
np.testing.assert_array_equal(nc['source'].unique(), np.array(['A'], dtype=object))
'data_select': 0.137 seconds
'load': 0.168 seconds
where using parquet
[10]:
# using parquet / read_parquet the where's get convert to 'filters', see pd.read_parquet for more details
df = DataLoader.load(source=parq_tmp,
where=[
{"col": "source", "comp": "==", "val": "A"},
{"col": "lat", "comp": ">=", "val": 65.0}
]
)
assert df['lat'].min() >= 65.0
pd.testing.assert_frame_equal(a.loc[a['lat'] >= 65.0], df)
# NOTE: using querying on datetime / timestamp objects need to convert to datetime via -
# pd.Timestamp
max_time = pd.Timestamp("2020-03-05")
df = DataLoader.load(source=parq_tmp,
where=[
{"col": "datetime", "comp": "<=", "val": max_time}
]
)
pd.testing.assert_frame_equal(_.loc[_['datetime'] <= max_time], df)
'data_select': 0.058 seconds
'load': 0.058 seconds
'data_select': 0.047 seconds
'load': 0.047 seconds
use ‘row_select’ to select subset after data is loaded into memory
[11]:
cprint("-"*20, "BOLD")
cprint("row_select examples", "BOLD")
# 'where dict' can be used for row_select
df0 = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
row_select={"col": "source", "comp": "==", "val": "A"})
# NOTE: using where is faster
df1 = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
where={"col": "source", "comp": "==", "val": "A"})
pd.testing.assert_frame_equal(df0, df1)
# row select allows for using lambda functions that returns a bool array
# - col_args specify the columns of the data to pass in as arguments to "func"
df0 = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
row_select={
"func": lambda x: x >= 65.0,
"col_args": "lat"
})
assert df0['lat'].min() >= 65.0
# the lambda functions can be supplied as strings - useful when passing parameters from a json configuration
# NOTE: if func is a string it will be converted with eval(...)
df1 = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
row_select={
"func": "lambda x: x >= 65.0",
"col_args": "lat"
})
pd.testing.assert_frame_equal(df0, df1)
--------------------
row_select examples
closing
'data_select': 0.398 seconds
'load': 0.460 seconds
closing
'data_select': 0.295 seconds
'load': 0.296 seconds
closing
'data_select': 0.397 seconds
'load': 0.423 seconds
closing
'data_select': 0.394 seconds
'load': 0.424 seconds
Advanced: more row_select and where
[12]:
cprint("-"*20, "BOLD")
cprint("more row_select and where", "BOLD")
# multiple columns can be supplied
df2 = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
row_select={
"func": "lambda x, y: (x >= 65.0) & (y == 'A')",
"col_args": ["lat", "source"]
})
assert df2['lat'].min() >= 65.0
np.testing.assert_array_equal(df2['source'].unique(), np.array(['A'], dtype=object))
# column values can be supplied via col_kwargs
# - this can be useful if a more involved function is supplied
df2 = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
row_select={
"func": "lambda x, y: (x >= 65.0) & (y >= 0)",
"col_kwargs": {
"x": "lat", "y": "lon"
}
})
assert df2['lat'].min() >= 65.0
assert df2['lon'].min() >= 0.0
# row_select can be negated (or inverted) - flipping Trues to False and vice versa
# - this can be useful when defining hold out data
# - e.g. create a row_select that selects the desired data and then use negate to make sure it's excluded
df3 = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
row_select={
"func": "lambda x: (x >= 65.0)",
"col_args": "lat",
"negate": True
})
assert df3['lat'].max() < 65.0
df3 = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
row_select={
"func": "lambda x, y: (x >= 65.0) & (y >= 0)",
"col_kwargs": {
"x": "lat", "y": "lon"
},
"negate": True
})
# there should be no rows with lat>=65.0 AND lon>=0
assert len(df3.loc[(df3['lat'] >= 65.0) & (df3['lon'] >= 0.0)]) == 0
# multiple row_selects can be combined via a list
# - similar to where, they are combined via an AND boolean operation
df4 = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
row_select=[
{"col": "source", "comp": "==", "val": "A"},
{
"func": "lambda x: (x >= 65.0)",
"col_args": "lat"
},
{
"func": "lambda y: y >= 0.0",
"col_kwargs": {"y": "lon"}
},
{
"func": "lambda y: y >= 0.0",
"col_args": "z"
}
])
assert df4['lat'].min() >= 65.0
assert df4['lon'].min() >= 0.0
assert df4['z'].min() >= 0.0
np.testing.assert_array_equal(df4['source'].unique(), np.array(['A'], dtype=object))
# where and row_selects can be used together
# - where's are used first, when reading data in from file
# - row_selects are applied to the data in memory
df5 = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
where={"col": "source", "comp": "==", "val": "A"},
row_select={"col": "source", "comp": "==", "val": "B"})
assert len(df5) == 0
# TODO: show where and row_select using netcdf data
--------------------
more row_select and where
closing
'data_select': 0.395 seconds
'load': 0.425 seconds
closing
'data_select': 0.396 seconds
'load': 0.416 seconds
closing
'data_select': 0.399 seconds
'load': 0.417 seconds
closing
'data_select': 0.397 seconds
'load': 0.421 seconds
closing
'data_select': 0.408 seconds
'load': 0.476 seconds
closing
'data_select': 0.282 seconds
'load': 0.300 seconds
Advanced: col_funcs - apply functions to create or modify columns
columns functions take in a dict, with the key being the new (or existing) column and the value - a dict specifying how the column shall be created - NOTE: by default columns are extracted from dataframe columns as numpy arrays (so no need to take values)
[13]:
cprint("-"*20, "BOLD")
cprint("applying col(umn)_func(tion)s to data after reading into memory", "BOLD")
# add a column
df1 = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
col_funcs={
"z_pos": {
"func": "lambda x: np.where(x>0, x, np.nan)",
"col_args": "z"
}
})
assert np.all(df1.loc[np.isnan(df1['z_pos']), "z"] <= 0)
# modify / create a 'date' column
df1 = DataLoader.load(source=hdf5_tmp,
table=hdf5_table,
col_funcs={
"date": {
"func": "lambda x: x.astype('datetime64[D]')",
"col_args": "datetime"
}
})
# reference dataframe: column modified after load
df0 = DataLoader.load(source=hdf5_tmp,
table=hdf5_table)
# convert datetime from
df0['date'] = df0['datetime'].values.astype('datetime64[D]')
pd.testing.assert_frame_equal(df0, df1)
--------------------
applying col(umn)_func(tion)s to data after reading into memory
closing
'data_select': 0.396 seconds
'load': 0.401 seconds
closing
'data_select': 0.400 seconds
'load': 0.451 seconds
closing
'data_select': 0.396 seconds
'load': 0.397 seconds
add_data_to_col, col_select, reset_index
[14]:
# TODO: add simple examples of above
remove tmp files
[15]:
cprint("-"*20, "BOLD")
cprint("cleaning up: delete tmp files", "BOLD")
# delete tmp files
for i in [netcdf_tmp, hdf5_tmp, parq_tmp, tsv_file]:
print(f"removing tmp file: {i}")
os.remove(i)
assert not os.path.exists(i)
--------------------
cleaning up: delete tmp files
removing tmp file: /home/runner/work/GPSat/GPSat/data/example/tmp.nc
removing tmp file: /home/runner/work/GPSat/GPSat/data/example/tmp.h5
removing tmp file: /home/runner/work/GPSat/GPSat/data/example/tmp.parquet
removing tmp file: /home/runner/work/GPSat/GPSat/data/example/tmp.tsv