{ "cells": [ { "cell_type": "markdown", "id": "a384d9f2", "metadata": { "lines_to_next_cell": 0 }, "source": [ "# DataLoader Examples" ] }, { "cell_type": "code", "execution_count": null, "id": "fdcd3d8b", "metadata": {}, "outputs": [], "source": [ "import os\n", "import re\n", "import numpy as np\n", "import pandas as pd\n", "import xarray as xr\n", "\n", "from GPSat.dataloader import DataLoader\n", "from GPSat import get_data_path\n", "from GPSat.utils import cprint\n", "\n", "pd.set_option(\"display.max_columns\", 200)" ] }, { "cell_type": "markdown", "id": "307ce383", "metadata": { "lines_to_next_cell": 0 }, "source": [ "## load method\n", "\n", "read from csv" ] }, { "cell_type": "code", "execution_count": null, "id": "507fef2f", "metadata": {}, "outputs": [], "source": [ "\n", "cprint(\"-\"*20, \"BOLD\")\n", "cprint(\"loading individual csv files\", \"BOLD\")\n", "\n", "# load data, engine to use is determined by file name\n", "a = DataLoader.load(source=get_data_path(\"example\", \"A_RAW.csv\"))\n", "print(a.head(2))\n", "\n", "# specify engine, in this case a panda's read_* method\n", "b = DataLoader.load(source=get_data_path(\"example\", \"B_RAW.csv\"),\n", " engine=\"read_csv\")\n", "\n", "# provide additional arguments to read_csv\n", "c = DataLoader.load(source=get_data_path(\"example\", \"C_RAW.csv\"),\n", " engine=\"read_csv\",\n", " source_kwargs={\"sep\": ','})" ] }, { "cell_type": "markdown", "id": "0338bdb1", "metadata": { "lines_to_next_cell": 0 }, "source": [ "## load method con't\n", "\n", "save tab seperated file" ] }, { "cell_type": "code", "execution_count": null, "id": "cf2b6060", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "\n", "cprint(\"-\"*20, \"BOLD\")\n", "cprint(\"loading from tab separated file\", \"BOLD\")\n", "\n", "# (store and) read from tab seperated\n", "tsv_file = get_data_path(\"example\", \"tmp.tsv\")\n", "c.to_csv(tsv_file, sep=\"\\t\", index=False)\n", "\n", "# load tsv file, providing additional (source keyword) arguments\n", "# - which will be passed into pd.read_csv\n", "_ = DataLoader.load(source=tsv_file,\n", " engine=\"read_csv\",\n", " source_kwargs={\"sep\": \"\\t\", \"keep_default_na\": True})\n", "\n", "pd.testing.assert_frame_equal(c, _)" ] }, { "cell_type": "markdown", "id": "8272825d", "metadata": { "lines_to_next_cell": 0 }, "source": [ "## read in multiple files in a folder\n", "\n", "identify files using a regular expression\n", "apply col_funcs after loaded into memory - see below for details on col_funcs" ] }, { "cell_type": "code", "execution_count": null, "id": "41b95149", "metadata": {}, "outputs": [], "source": [ "\n", "col_funcs = {\n", " \"source\": {\n", " \"func\": lambda x: re.sub('_RAW.*$', '', os.path.basename(x)),\n", " \"filename_as_arg\": True\n", " },\n", " \"datetime\": {\n", " \"func\": lambda x: x.astype('datetime64[ns]'),\n", " \"col_args\": \"datetime\"\n", " }\n", "}\n", "\n", "# can also supply a list of sub-directories (sub_dirs) to look instead of file_dirs\n", "_ = DataLoader.read_flat_files(file_dirs=get_data_path(\"example\"),\n", " file_regex=\"_RAW\\.csv$\",\n", " col_funcs=col_funcs)\n", "\n", "cprint(\"head:\", \"BOLD\")\n", "print(_.head(2))\n", "\n", "cprint(\"dtypes:\", \"BOLD\")\n", "print(_.dtypes)" ] }, { "cell_type": "markdown", "id": "b3c9f989", "metadata": { "lines_to_next_cell": 0 }, "source": [ "## save and read from parquet file" ] }, { "cell_type": "code", "execution_count": null, "id": "07b85c42", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "cprint(\"-\"*20, \"BOLD\")\n", "cprint(\"loading from parquet file\", \"BOLD\")\n", "# store as h5 file - to demonstrate reading in\n", "parq_tmp = get_data_path(\"example\", \"tmp.parquet\")\n", "\n", "# NOTE: fastparquet allows for appending, alternative is pyarrow (or 'auto')\n", "_.to_parquet(parq_tmp, engine=\"fastparquet\")\n", "\n", "# read data from parquet file - using the pyarrow engine\n", "df = DataLoader.load(source=parq_tmp)\n", "\n", "pd.testing.assert_frame_equal(df, _)" ] }, { "cell_type": "markdown", "id": "b4f13f5d", "metadata": { "lines_to_next_cell": 0 }, "source": [ "## save and read from hdf5 file" ] }, { "cell_type": "code", "execution_count": null, "id": "dccc5f4f", "metadata": {}, "outputs": [], "source": [ "cprint(\"-\"*20, \"BOLD\")\n", "cprint(\"loading from hdf5 file\", \"BOLD\")\n", "# store as h5 file - to demonstrate reading in\n", "hdf5_tmp = get_data_path(\"example\", \"tmp.h5\")\n", "hdf5_table = \"data\"\n", "with pd.HDFStore(hdf5_tmp, mode=\"w\") as store:\n", " # setting data_columns = True so will be searchable\n", " store.append(key=hdf5_table, value=_, data_columns=True)\n", "\n", "# read data from table in hdf5\n", "df = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table)\n", "\n", "pd.testing.assert_frame_equal(df, _)" ] }, { "cell_type": "markdown", "id": "77c1aba5", "metadata": { "lines_to_next_cell": 0 }, "source": [ "## save and read from netcdf file" ] }, { "cell_type": "code", "execution_count": null, "id": "b67eaba8", "metadata": {}, "outputs": [], "source": [ "\n", "cprint(\"-\"*20, \"BOLD\")\n", "cprint(\"loading from netcdf file\", \"BOLD\")\n", "\n", "# first - save to file, using multi index for dimensions\n", "tmp_ = _.set_index(['datetime', 'source'])\n", "ds = xr.Dataset.from_dataframe(tmp_)\n", "\n", "netcdf_tmp = get_data_path(\"example\", \"tmp.nc\")\n", "ds.to_netcdf(path=netcdf_tmp)\n", "\n", "# read data from netcdf file\n", "# NOTE: this will comeback with a multi index - can reset with reset_index=True\n", "nc = DataLoader.load(source=netcdf_tmp)\n", "\n", "# netcdf will have nans for missing values\n", "# - netcdf effectively stores values in n-d array with the\n", "# - dimensions determined by an index when converting from DataFrame\n", "# nc.dropna(inplace=True)\n", "\n", "# sort indices (and data) in the same way\n", "tmp_.sort_index(inplace=True)\n", "nc.sort_index(inplace=True)\n", "\n", "# ensure columns are in same order - might not be needed\n", "nc = nc[tmp_.columns]\n", "\n", "pd.testing.assert_frame_equal(tmp_, nc)" ] }, { "cell_type": "markdown", "id": "8a1acebe", "metadata": { "lines_to_next_cell": 0 }, "source": [ "## use 'where' to select subset without having to read entirely into memory\n", "\n", "this is accomplished by using a 'where dict', containing the following keys\n", "- 'col' : the column of the data used for selection\n", "- 'comp': the comparison to used, e.g. \">\", \">=\", \"==\", \"!=\", \"<=\", \"<\"\n", "- 'val' : value being compared to column values\n", "\n", "where using hdf5" ] }, { "cell_type": "code", "execution_count": null, "id": "921176ef", "metadata": {}, "outputs": [], "source": [ "\n", "a = _.loc[_['source'] == \"A\"].copy(True)\n", "\n", "cprint(\"-\"*20, \"BOLD\")\n", "cprint(\"applying 'where' dictionaries - selecting data at source\", \"BOLD\")\n", "\n", "df = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " where={\"col\": \"source\", \"comp\": \"==\", \"val\": \"A\"})\n", "\n", "pd.testing.assert_frame_equal(a, df)\n", "\n", "# hdf5 allows for a list of values to be provided\n", "df = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " where={\"col\": \"source\", \"comp\": \"==\", \"val\": [\"A\", \"B\"]}\n", " )\n", "\n", "# unique doesn't sort?\n", "np.testing.assert_array_equal(np.sort(df['source'].unique()),\n", " np.array(['A', 'B'], dtype=object))\n", "\n", "\n", "# multiple 'where dicts' can be combined in a list\n", "# - they will be combined with an AND operation\n", "df = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " where=[\n", " {\"col\": \"source\", \"comp\": \"==\", \"val\": \"A\"},\n", " {\"col\": \"lat\", \"comp\": \">=\", \"val\": 65.0}\n", " ]\n", " )\n", "\n", "assert df['lat'].min() >= 65.0\n", "\n", "pd.testing.assert_frame_equal(a.loc[a['lat'] >= 65.0], df)" ] }, { "cell_type": "markdown", "id": "439109cb", "metadata": { "lines_to_next_cell": 0 }, "source": [ "where using netcdf files" ] }, { "cell_type": "code", "execution_count": null, "id": "81fdc209", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "\n", "# apply where_dict to netcdf file - must be for the index, in thise case \"source\" or \"datetime\"\n", "nc = DataLoader.load(source=netcdf_tmp,\n", " where=[\n", " {\"col\": \"source\", \"comp\": \"==\", \"val\": \"A\"}\n", " ],\n", " reset_index=True\n", " )\n", "\n", "np.testing.assert_array_equal(nc['source'].unique(), np.array(['A'], dtype=object))" ] }, { "cell_type": "markdown", "id": "03110386", "metadata": { "lines_to_next_cell": 0 }, "source": [ "where using parquet" ] }, { "cell_type": "code", "execution_count": null, "id": "a78b7709", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "\n", "# using parquet / read_parquet the where's get convert to 'filters', see pd.read_parquet for more details\n", "\n", "df = DataLoader.load(source=parq_tmp,\n", " where=[\n", " {\"col\": \"source\", \"comp\": \"==\", \"val\": \"A\"},\n", " {\"col\": \"lat\", \"comp\": \">=\", \"val\": 65.0}\n", " ]\n", " )\n", "assert df['lat'].min() >= 65.0\n", "\n", "pd.testing.assert_frame_equal(a.loc[a['lat'] >= 65.0], df)\n", "\n", "\n", "# NOTE: using querying on datetime / timestamp objects need to convert to datetime via -\n", "# pd.Timestamp\n", "\n", "max_time = pd.Timestamp(\"2020-03-05\")\n", "df = DataLoader.load(source=parq_tmp,\n", " where=[\n", " {\"col\": \"datetime\", \"comp\": \"<=\", \"val\": max_time}\n", " ]\n", " )\n", "\n", "pd.testing.assert_frame_equal(_.loc[_['datetime'] <= max_time], df)" ] }, { "cell_type": "markdown", "id": "84f49dc8", "metadata": { "lines_to_next_cell": 0 }, "source": [ "## use 'row_select' to select subset after data is loaded into memory" ] }, { "cell_type": "code", "execution_count": null, "id": "f5b60402", "metadata": {}, "outputs": [], "source": [ "\n", "\n", "cprint(\"-\"*20, \"BOLD\")\n", "cprint(\"row_select examples\", \"BOLD\")\n", "\n", "\n", "\n", "# 'where dict' can be used for row_select\n", "df0 = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " row_select={\"col\": \"source\", \"comp\": \"==\", \"val\": \"A\"})\n", "\n", "# NOTE: using where is faster\n", "df1 = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " where={\"col\": \"source\", \"comp\": \"==\", \"val\": \"A\"})\n", "\n", "pd.testing.assert_frame_equal(df0, df1)\n", "\n", "# row select allows for using lambda functions that returns a bool array\n", "# - col_args specify the columns of the data to pass in as arguments to \"func\"\n", "df0 = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " row_select={\n", " \"func\": lambda x: x >= 65.0,\n", " \"col_args\": \"lat\"\n", " })\n", "\n", "assert df0['lat'].min() >= 65.0\n", "\n", "# the lambda functions can be supplied as strings - useful when passing parameters from a json configuration\n", "# NOTE: if func is a string it will be converted with eval(...)\n", "df1 = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " row_select={\n", " \"func\": \"lambda x: x >= 65.0\",\n", " \"col_args\": \"lat\"\n", " })\n", "\n", "pd.testing.assert_frame_equal(df0, df1)" ] }, { "cell_type": "markdown", "id": "1b23a7b9", "metadata": { "lines_to_next_cell": 0 }, "source": [ "## Advanced: more row_select and where" ] }, { "cell_type": "code", "execution_count": null, "id": "f2a69a32", "metadata": {}, "outputs": [], "source": [ "\n", "cprint(\"-\"*20, \"BOLD\")\n", "cprint(\"more row_select and where\", \"BOLD\")\n", "\n", "\n", "# multiple columns can be supplied\n", "df2 = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " row_select={\n", " \"func\": \"lambda x, y: (x >= 65.0) & (y == 'A')\",\n", " \"col_args\": [\"lat\", \"source\"]\n", " })\n", "\n", "assert df2['lat'].min() >= 65.0\n", "np.testing.assert_array_equal(df2['source'].unique(), np.array(['A'], dtype=object))\n", "\n", "\n", "# column values can be supplied via col_kwargs\n", "# - this can be useful if a more involved function is supplied\n", "df2 = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " row_select={\n", " \"func\": \"lambda x, y: (x >= 65.0) & (y >= 0)\",\n", " \"col_kwargs\": {\n", " \"x\": \"lat\", \"y\": \"lon\"\n", " }\n", " })\n", "\n", "assert df2['lat'].min() >= 65.0\n", "assert df2['lon'].min() >= 0.0\n", "\n", "# row_select can be negated (or inverted) - flipping Trues to False and vice versa\n", "# - this can be useful when defining hold out data\n", "# - e.g. create a row_select that selects the desired data and then use negate to make sure it's excluded\n", "df3 = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " row_select={\n", " \"func\": \"lambda x: (x >= 65.0)\",\n", " \"col_args\": \"lat\",\n", " \"negate\": True\n", " })\n", "\n", "assert df3['lat'].max() < 65.0\n", "\n", "\n", "df3 = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " row_select={\n", " \"func\": \"lambda x, y: (x >= 65.0) & (y >= 0)\",\n", " \"col_kwargs\": {\n", " \"x\": \"lat\", \"y\": \"lon\"\n", " },\n", " \"negate\": True\n", " })\n", "\n", "# there should be no rows with lat>=65.0 AND lon>=0\n", "assert len(df3.loc[(df3['lat'] >= 65.0) & (df3['lon'] >= 0.0)]) == 0\n", "\n", "# multiple row_selects can be combined via a list\n", "# - similar to where, they are combined via an AND boolean operation\n", "\n", "\n", "df4 = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " row_select=[\n", " {\"col\": \"source\", \"comp\": \"==\", \"val\": \"A\"},\n", " {\n", " \"func\": \"lambda x: (x >= 65.0)\",\n", " \"col_args\": \"lat\"\n", " },\n", " {\n", " \"func\": \"lambda y: y >= 0.0\",\n", " \"col_kwargs\": {\"y\": \"lon\"}\n", "\n", " },\n", " {\n", " \"func\": \"lambda y: y >= 0.0\",\n", " \"col_args\": \"z\"\n", "\n", " }\n", " ])\n", "\n", "assert df4['lat'].min() >= 65.0\n", "assert df4['lon'].min() >= 0.0\n", "assert df4['z'].min() >= 0.0\n", "np.testing.assert_array_equal(df4['source'].unique(), np.array(['A'], dtype=object))\n", "\n", "\n", "# where and row_selects can be used together\n", "# - where's are used first, when reading data in from file\n", "# - row_selects are applied to the data in memory\n", "df5 = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " where={\"col\": \"source\", \"comp\": \"==\", \"val\": \"A\"},\n", " row_select={\"col\": \"source\", \"comp\": \"==\", \"val\": \"B\"})\n", "assert len(df5) == 0\n", "\n", "# TODO: show where and row_select using netcdf data" ] }, { "cell_type": "markdown", "id": "83b833ed", "metadata": {}, "source": [ "## Advanced: col_funcs - apply functions to create or modify columns\n", "\n", "columns functions take in a dict, with the key being the new (or existing) column and the value\n", "- a dict specifying how the column shall be created\n", "- NOTE: by default columns are extracted from dataframe columns as numpy arrays (so no need to take values)" ] }, { "cell_type": "code", "execution_count": null, "id": "15219442", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "\n", "cprint(\"-\"*20, \"BOLD\")\n", "cprint(\"applying col(umn)_func(tion)s to data after reading into memory\", \"BOLD\")\n", "\n", "\n", "# add a column\n", "df1 = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " col_funcs={\n", " \"z_pos\": {\n", " \"func\": \"lambda x: np.where(x>0, x, np.nan)\",\n", " \"col_args\": \"z\"\n", " }\n", " })\n", "assert np.all(df1.loc[np.isnan(df1['z_pos']), \"z\"] <= 0)\n", "\n", "\n", "# modify / create a 'date' column\n", "df1 = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table,\n", " col_funcs={\n", " \"date\": {\n", " \"func\": \"lambda x: x.astype('datetime64[D]')\",\n", " \"col_args\": \"datetime\"\n", " }\n", " })\n", "\n", "# reference dataframe: column modified after load\n", "df0 = DataLoader.load(source=hdf5_tmp,\n", " table=hdf5_table)\n", "\n", "# convert datetime from\n", "df0['date'] = df0['datetime'].values.astype('datetime64[D]')\n", "\n", "pd.testing.assert_frame_equal(df0, df1)" ] }, { "cell_type": "markdown", "id": "645bd107", "metadata": { "lines_to_next_cell": 0 }, "source": [ "add_data_to_col, col_select, reset_index" ] }, { "cell_type": "code", "execution_count": null, "id": "384e943f", "metadata": {}, "outputs": [], "source": [ "# TODO: add simple examples of above" ] }, { "cell_type": "markdown", "id": "c9649246", "metadata": { "lines_to_next_cell": 0 }, "source": [ "remove tmp files" ] }, { "cell_type": "code", "execution_count": null, "id": "0e92a0f1", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "\n", "cprint(\"-\"*20, \"BOLD\")\n", "cprint(\"cleaning up: delete tmp files\", \"BOLD\")\n", "\n", "# delete tmp files\n", "for i in [netcdf_tmp, hdf5_tmp, parq_tmp, tsv_file]:\n", " print(f\"removing tmp file: {i}\")\n", " os.remove(i)\n", " assert not os.path.exists(i)" ] } ], "metadata": { "jupytext": { "cell_metadata_filter": "-all", "main_language": "python", "notebook_metadata_filter": "-all" } }, "nbformat": 4, "nbformat_minor": 5 }