DataLoader Examples

[1]:
import os
import re
import numpy as np
import pandas as pd
import xarray as xr

from GPSat.dataloader import DataLoader
from GPSat import get_data_path
from GPSat.utils import cprint

pd.set_option("display.max_columns", 200)

load method

read from csv

[2]:

cprint("-"*20, "BOLD") cprint("loading individual csv files", "BOLD") # load data, engine to use is determined by file name a = DataLoader.load(source=get_data_path("example", "A_RAW.csv")) print(a.head(2)) # specify engine, in this case a panda's read_* method b = DataLoader.load(source=get_data_path("example", "B_RAW.csv"), engine="read_csv") # provide additional arguments to read_csv c = DataLoader.load(source=get_data_path("example", "C_RAW.csv"), engine="read_csv", source_kwargs={"sep": ','})
--------------------
loading individual csv files
'data_select': 0.227 seconds
'load': 0.228 seconds
         lon        lat                    datetime       z
0 -57.190860  65.647048  2020-03-01 00:03:32.948838  0.0330
1 -57.224372  65.679991  2020-03-01 00:03:36.667280  0.1057
'data_select': 0.197 seconds
'load': 0.198 seconds
'data_select': 0.335 seconds
'load': 0.336 seconds

load method con’t

save tab seperated file

[3]:

cprint("-"*20, "BOLD") cprint("loading from tab separated file", "BOLD") # (store and) read from tab seperated tsv_file = get_data_path("example", "tmp.tsv") c.to_csv(tsv_file, sep="\t", index=False) # load tsv file, providing additional (source keyword) arguments # - which will be passed into pd.read_csv _ = DataLoader.load(source=tsv_file, engine="read_csv", source_kwargs={"sep": "\t", "keep_default_na": True}) pd.testing.assert_frame_equal(c, _)
--------------------
loading from tab separated file
'data_select': 0.300 seconds
'load': 0.300 seconds

read in multiple files in a folder

identify files using a regular expression apply col_funcs after loaded into memory - see below for details on col_funcs

[4]:

col_funcs = { "source": { "func": lambda x: re.sub('_RAW.*$', '', os.path.basename(x)), "filename_as_arg": True }, "datetime": { "func": lambda x: x.astype('datetime64[ns]'), "col_args": "datetime" } } # can also supply a list of sub-directories (sub_dirs) to look instead of file_dirs _ = DataLoader.read_flat_files(file_dirs=get_data_path("example"), file_regex="_RAW\.csv$", col_funcs=col_funcs) cprint("head:", "BOLD") print(_.head(2)) cprint("dtypes:", "BOLD") print(_.dtypes)
----------------------------------------------------------------------------------------------------
reading files from:
/home/runner/work/GPSat/GPSat/data/example/
that match regular expression: _RAW\.csv$
'read_from_multiple_files': 0.796 seconds
head:
          lon        lat                   datetime       z source
0 -123.678061  81.395356 2020-03-01 00:00:00.398736  0.5537      B
1 -123.717808  81.394936 2020-03-01 00:00:02.014304  0.2966      B
dtypes:
lon                float64
lat                float64
datetime    datetime64[ns]
z                  float64
source              object
dtype: object

save and read from parquet file

[5]:
cprint("-"*20, "BOLD")
cprint("loading from parquet file", "BOLD")
# store as h5 file - to demonstrate reading in
parq_tmp = get_data_path("example", "tmp.parquet")

# NOTE: fastparquet allows for appending, alternative is pyarrow (or 'auto')
_.to_parquet(parq_tmp, engine="fastparquet")

# read data from parquet file - using the pyarrow engine
df = DataLoader.load(source=parq_tmp)

pd.testing.assert_frame_equal(df, _)
--------------------
loading from parquet file
'data_select': 0.079 seconds
'load': 0.079 seconds

save and read from hdf5 file

[6]:
cprint("-"*20, "BOLD")
cprint("loading from hdf5 file", "BOLD")
# store as h5 file - to demonstrate reading in
hdf5_tmp = get_data_path("example", "tmp.h5")
hdf5_table = "data"
with pd.HDFStore(hdf5_tmp, mode="w") as store:
    # setting data_columns = True so will be searchable
    store.append(key=hdf5_table, value=_, data_columns=True)

# read data from table in hdf5
df = DataLoader.load(source=hdf5_tmp,
                     table=hdf5_table)

pd.testing.assert_frame_equal(df, _)
--------------------
loading from hdf5 file
closing
'data_select': 0.397 seconds
'load': 0.398 seconds

save and read from netcdf file

[7]:

cprint("-"*20, "BOLD") cprint("loading from netcdf file", "BOLD") # first - save to file, using multi index for dimensions tmp_ = _.set_index(['datetime', 'source']) ds = xr.Dataset.from_dataframe(tmp_) netcdf_tmp = get_data_path("example", "tmp.nc") ds.to_netcdf(path=netcdf_tmp) # read data from netcdf file # NOTE: this will comeback with a multi index - can reset with reset_index=True nc = DataLoader.load(source=netcdf_tmp) # netcdf will have nans for missing values # - netcdf effectively stores values in n-d array with the # - dimensions determined by an index when converting from DataFrame # nc.dropna(inplace=True) # sort indices (and data) in the same way tmp_.sort_index(inplace=True) nc.sort_index(inplace=True) # ensure columns are in same order - might not be needed nc = nc[tmp_.columns] pd.testing.assert_frame_equal(tmp_, nc)
--------------------
loading from netcdf file
'data_select': 0.154 seconds
'load': 0.200 seconds

use ‘where’ to select subset without having to read entirely into memory

this is accomplished by using a ‘where dict’, containing the following keys - ‘col’ : the column of the data used for selection - ‘comp’: the comparison to used, e.g. “>”, “>=”, “==”, “!=”, “<=”, “<” - ‘val’ : value being compared to column values

where using hdf5

[8]:

a = _.loc[_['source'] == "A"].copy(True) cprint("-"*20, "BOLD") cprint("applying 'where' dictionaries - selecting data at source", "BOLD") df = DataLoader.load(source=hdf5_tmp, table=hdf5_table, where={"col": "source", "comp": "==", "val": "A"}) pd.testing.assert_frame_equal(a, df) # hdf5 allows for a list of values to be provided df = DataLoader.load(source=hdf5_tmp, table=hdf5_table, where={"col": "source", "comp": "==", "val": ["A", "B"]} ) # unique doesn't sort? np.testing.assert_array_equal(np.sort(df['source'].unique()), np.array(['A', 'B'], dtype=object)) # multiple 'where dicts' can be combined in a list # - they will be combined with an AND operation df = DataLoader.load(source=hdf5_tmp, table=hdf5_table, where=[ {"col": "source", "comp": "==", "val": "A"}, {"col": "lat", "comp": ">=", "val": 65.0} ] ) assert df['lat'].min() >= 65.0 pd.testing.assert_frame_equal(a.loc[a['lat'] >= 65.0], df)
--------------------
applying 'where' dictionaries - selecting data at source
closing
'data_select': 0.280 seconds
'load': 0.281 seconds
closing
'data_select': 0.499 seconds
'load': 0.500 seconds
closing
'data_select': 0.348 seconds
'load': 0.349 seconds

where using netcdf files

[9]:

# apply where_dict to netcdf file - must be for the index, in thise case "source" or "datetime" nc = DataLoader.load(source=netcdf_tmp, where=[ {"col": "source", "comp": "==", "val": "A"} ], reset_index=True ) np.testing.assert_array_equal(nc['source'].unique(), np.array(['A'], dtype=object))
'data_select': 0.137 seconds
'load': 0.168 seconds

where using parquet

[10]:

# using parquet / read_parquet the where's get convert to 'filters', see pd.read_parquet for more details df = DataLoader.load(source=parq_tmp, where=[ {"col": "source", "comp": "==", "val": "A"}, {"col": "lat", "comp": ">=", "val": 65.0} ] ) assert df['lat'].min() >= 65.0 pd.testing.assert_frame_equal(a.loc[a['lat'] >= 65.0], df) # NOTE: using querying on datetime / timestamp objects need to convert to datetime via - # pd.Timestamp max_time = pd.Timestamp("2020-03-05") df = DataLoader.load(source=parq_tmp, where=[ {"col": "datetime", "comp": "<=", "val": max_time} ] ) pd.testing.assert_frame_equal(_.loc[_['datetime'] <= max_time], df)
'data_select': 0.058 seconds
'load': 0.058 seconds
'data_select': 0.047 seconds
'load': 0.047 seconds

use ‘row_select’ to select subset after data is loaded into memory

[11]:


cprint("-"*20, "BOLD") cprint("row_select examples", "BOLD") # 'where dict' can be used for row_select df0 = DataLoader.load(source=hdf5_tmp, table=hdf5_table, row_select={"col": "source", "comp": "==", "val": "A"}) # NOTE: using where is faster df1 = DataLoader.load(source=hdf5_tmp, table=hdf5_table, where={"col": "source", "comp": "==", "val": "A"}) pd.testing.assert_frame_equal(df0, df1) # row select allows for using lambda functions that returns a bool array # - col_args specify the columns of the data to pass in as arguments to "func" df0 = DataLoader.load(source=hdf5_tmp, table=hdf5_table, row_select={ "func": lambda x: x >= 65.0, "col_args": "lat" }) assert df0['lat'].min() >= 65.0 # the lambda functions can be supplied as strings - useful when passing parameters from a json configuration # NOTE: if func is a string it will be converted with eval(...) df1 = DataLoader.load(source=hdf5_tmp, table=hdf5_table, row_select={ "func": "lambda x: x >= 65.0", "col_args": "lat" }) pd.testing.assert_frame_equal(df0, df1)
--------------------
row_select examples
closing
'data_select': 0.398 seconds
'load': 0.460 seconds
closing
'data_select': 0.295 seconds
'load': 0.296 seconds
closing
'data_select': 0.397 seconds
'load': 0.423 seconds
closing
'data_select': 0.394 seconds
'load': 0.424 seconds

Advanced: more row_select and where

[12]:

cprint("-"*20, "BOLD") cprint("more row_select and where", "BOLD") # multiple columns can be supplied df2 = DataLoader.load(source=hdf5_tmp, table=hdf5_table, row_select={ "func": "lambda x, y: (x >= 65.0) & (y == 'A')", "col_args": ["lat", "source"] }) assert df2['lat'].min() >= 65.0 np.testing.assert_array_equal(df2['source'].unique(), np.array(['A'], dtype=object)) # column values can be supplied via col_kwargs # - this can be useful if a more involved function is supplied df2 = DataLoader.load(source=hdf5_tmp, table=hdf5_table, row_select={ "func": "lambda x, y: (x >= 65.0) & (y >= 0)", "col_kwargs": { "x": "lat", "y": "lon" } }) assert df2['lat'].min() >= 65.0 assert df2['lon'].min() >= 0.0 # row_select can be negated (or inverted) - flipping Trues to False and vice versa # - this can be useful when defining hold out data # - e.g. create a row_select that selects the desired data and then use negate to make sure it's excluded df3 = DataLoader.load(source=hdf5_tmp, table=hdf5_table, row_select={ "func": "lambda x: (x >= 65.0)", "col_args": "lat", "negate": True }) assert df3['lat'].max() < 65.0 df3 = DataLoader.load(source=hdf5_tmp, table=hdf5_table, row_select={ "func": "lambda x, y: (x >= 65.0) & (y >= 0)", "col_kwargs": { "x": "lat", "y": "lon" }, "negate": True }) # there should be no rows with lat>=65.0 AND lon>=0 assert len(df3.loc[(df3['lat'] >= 65.0) & (df3['lon'] >= 0.0)]) == 0 # multiple row_selects can be combined via a list # - similar to where, they are combined via an AND boolean operation df4 = DataLoader.load(source=hdf5_tmp, table=hdf5_table, row_select=[ {"col": "source", "comp": "==", "val": "A"}, { "func": "lambda x: (x >= 65.0)", "col_args": "lat" }, { "func": "lambda y: y >= 0.0", "col_kwargs": {"y": "lon"} }, { "func": "lambda y: y >= 0.0", "col_args": "z" } ]) assert df4['lat'].min() >= 65.0 assert df4['lon'].min() >= 0.0 assert df4['z'].min() >= 0.0 np.testing.assert_array_equal(df4['source'].unique(), np.array(['A'], dtype=object)) # where and row_selects can be used together # - where's are used first, when reading data in from file # - row_selects are applied to the data in memory df5 = DataLoader.load(source=hdf5_tmp, table=hdf5_table, where={"col": "source", "comp": "==", "val": "A"}, row_select={"col": "source", "comp": "==", "val": "B"}) assert len(df5) == 0 # TODO: show where and row_select using netcdf data
--------------------
more row_select and where
closing
'data_select': 0.395 seconds
'load': 0.425 seconds
closing
'data_select': 0.396 seconds
'load': 0.416 seconds
closing
'data_select': 0.399 seconds
'load': 0.417 seconds
closing
'data_select': 0.397 seconds
'load': 0.421 seconds
closing
'data_select': 0.408 seconds
'load': 0.476 seconds
closing
'data_select': 0.282 seconds
'load': 0.300 seconds

Advanced: col_funcs - apply functions to create or modify columns

columns functions take in a dict, with the key being the new (or existing) column and the value - a dict specifying how the column shall be created - NOTE: by default columns are extracted from dataframe columns as numpy arrays (so no need to take values)

[13]:

cprint("-"*20, "BOLD") cprint("applying col(umn)_func(tion)s to data after reading into memory", "BOLD") # add a column df1 = DataLoader.load(source=hdf5_tmp, table=hdf5_table, col_funcs={ "z_pos": { "func": "lambda x: np.where(x>0, x, np.nan)", "col_args": "z" } }) assert np.all(df1.loc[np.isnan(df1['z_pos']), "z"] <= 0) # modify / create a 'date' column df1 = DataLoader.load(source=hdf5_tmp, table=hdf5_table, col_funcs={ "date": { "func": "lambda x: x.astype('datetime64[D]')", "col_args": "datetime" } }) # reference dataframe: column modified after load df0 = DataLoader.load(source=hdf5_tmp, table=hdf5_table) # convert datetime from df0['date'] = df0['datetime'].values.astype('datetime64[D]') pd.testing.assert_frame_equal(df0, df1)
--------------------
applying col(umn)_func(tion)s to data after reading into memory
closing
'data_select': 0.396 seconds
'load': 0.401 seconds
closing
'data_select': 0.400 seconds
'load': 0.451 seconds
closing
'data_select': 0.396 seconds
'load': 0.397 seconds

add_data_to_col, col_select, reset_index

[14]:
# TODO: add simple examples of above

remove tmp files

[15]:

cprint("-"*20, "BOLD") cprint("cleaning up: delete tmp files", "BOLD") # delete tmp files for i in [netcdf_tmp, hdf5_tmp, parq_tmp, tsv_file]: print(f"removing tmp file: {i}") os.remove(i) assert not os.path.exists(i)
--------------------
cleaning up: delete tmp files
removing tmp file: /home/runner/work/GPSat/GPSat/data/example/tmp.nc
removing tmp file: /home/runner/work/GPSat/GPSat/data/example/tmp.h5
removing tmp file: /home/runner/work/GPSat/GPSat/data/example/tmp.parquet
removing tmp file: /home/runner/work/GPSat/GPSat/data/example/tmp.tsv