Tutorial

Pre-setup

[2]:
import os
import logging
import pandas
from datetime import datetime
from google.cloud import bigquery, storage
from google_pandas_load import Loader, LoaderQuickSetup
from google_pandas_load import LoadConfig
[3]:
project_id = 'dmp-y-tests'
dataset_name = 'tmp'
bucket_name = 'bucket_gpl'
# gs_dir_path is the path in
# the bucket of the directory that
# will contain the data in Storage.
gs_dir_path = 'gpl_dir/subdir'
local_dir_path = '/tmp/gpl_directory'
[4]:
if not os.path.isdir(local_dir_path):
    os.makedirs(local_dir_path)

Set up a loader

Throughout this document, we call loader an instance of google_pandas_load.Loader or of google_pandas_load.LoaderQuickSetup.

We emphasize that the second class is a child of the first one.

The next two sections will be devoted to the creation of both classes through their main parameters which are data locations.

the low-level way

To set up a loader the low-level way, use google_pandas_load.Loader.

In the following code cell, credentials are inferred from the environment.

[5]:
# the bq_client to execute the load jobs' cloud parts,
# which are the execution of queries, the extraction of BigQuery
# tables to Storage and the load of tables to BigQuery
# from Storage.
bq_client = bigquery.Client(
    project=project_id,
    credentials=None)

# the dataset_ref pointing to the dataset to store the data
# in BigQuery.
dataset_ref = bigquery.DatasetReference(
    project=project_id,
    dataset_id=dataset_name)

# the gs_client is used to instantiate a bucket.
gs_client = storage.Client(
    project=project_id,
    credentials=None)
# the bucket to store the data in Storage.
bucket = storage.Bucket(
    client=gs_client,
    name=bucket_name)

gpl = Loader(
    bq_client=bq_client,
    dataset_ref=dataset_ref,
    bucket=bucket,
    gs_dir_path=gs_dir_path,
    local_dir_path=local_dir_path)

In the setup above, the bq_client, the dataset_ref and the gs_client share the same project_id. Furthermore, the bq_client and the gs_client share the same credentials. However neither the project_id nor the credentials are required to be the same.

In order to be able to execute load jobs with all possible source and destination, the bq_client must have read and write access in both the dataset and the bucket.

If one wants to use directly the bucket’s root directory to store the data loaded in Storage, one can set the gs_dir_path parameter to None.

the quick way

To set up a loader quickly, use google_pandas_load.LoaderQuickSetup.

The code behind the instantiation is essentially the same as in the previous cell.

Contrary to the low-level way the bq_client, the dataset_ref and the gs_client share the same project_id. Moreover the bq_client and the gs_client share the same credentials.

[6]:
gpl_quick_setup = LoaderQuickSetup(
    project_id=project_id,
    dataset_name=dataset_name,
    bucket_name=bucket_name,
    gs_dir_path=gs_dir_path,
    credentials=None,
    local_dir_path=local_dir_path)

A simple download

[7]:
df = gpl.load(
    source='query',
    destination='dataframe',
    query='select 1 as x')

df
[7]:
x
0 1

A simple upload

[8]:
gpl.load(
    source='dataframe',
    destination='bq',
    data_name='a0',
    dataframe=df)

This command has created the following table in BigQuery:

image0

Its id in BigQuery is project_id.dataset_name.a0, where project_id is the dataset’s one.

Basic loading mechanism

It is explained in the documentation of this method google_pandas_load.Loader.load().

More examples

from query to gs

[9]:
gpl.load(
    source='query',
    destination='gs',
    data_name='a0',
    query='select 5 as y')

from gs to local

[10]:
gpl.load(
    source='gs',
    destination='local',
    data_name='a0')

from local to dataframe

[11]:
df = gpl.load(
    source='local',
    destination='dataframe',
    data_name='a0')

from dataframe to gs

[12]:
gpl.load(
    source='dataframe',
    destination='gs',
    data_name='a0',
    dataframe=df)

from gs to bq

[13]:
gpl.load(
    source='gs',
    destination='bq',
    data_name='a0')

List data

[14]:
query = """
select * from
(select 'Hello, ' as x from unnest(generate_array(1, 4000)))
cross join
(select 'World!' as y from unnest(generate_array(1, 4000)))
"""

gpl.load(
    source='query',
    destination='gs',
    data_name='a0',
    query=query)

To list this data, named a0, in Storage:

[15]:
gpl.list_blobs(data_name='a0')
[15]:
[<Blob: bucket_gpl, gpl_dir/subdir/a0-000000000000.csv.gz, 1618324295941852>,
 <Blob: bucket_gpl, gpl_dir/subdir/a0-000000000001.csv.gz, 1618324296126273>]

It is also possible to list the blob uris:

[16]:
gpl.list_blob_uris(data_name='a0')
[16]:
['gs://bucket_gpl/gpl_dir/subdir/a0-000000000000.csv.gz',
 'gs://bucket_gpl/gpl_dir/subdir/a0-000000000001.csv.gz']

In this example, data turned out to be sufficiently large that BigQuery had to split it into several files in Storage.

Let us move this data into the local folder:

[17]:
gpl.load(
    source='gs',
    destination='local',
    data_name='a0')

To list this data, named a0, in the local folder:

[18]:
gpl.list_local_file_paths(data_name='a0')
[18]:
['/tmp/gpl_directory/a0-000000000001.csv.gz',
 '/tmp/gpl_directory/a0-000000000000.csv.gz']

Check data existence

[19]:
print(gpl.exist_in_local(data_name='a1'))

gpl.load(
    source='query',
    destination='local',
    data_name='a1',
    query='select 2')

print(gpl.exist_in_local(data_name='a1'))
False
True

Delete data

[20]:
gpl.load(
    source='query',
    destination='gs',
    data_name='a1',
    query='select 2')

print(gpl.exist_in_gs(data_name='a1'))
gpl.delete_in_gs(data_name='a1')
print(gpl.exist_in_gs(data_name='a1'))
True
False

Cast data

cast data into pandas

[21]:
query = """
select 5 as x, 5 as y, 5 as z
"""
dtype = {
    'x': str,
    'y': float}

df = gpl.load(
    source='query',
    destination='dataframe',
    query=query,
    dtype=dtype)

df
[21]:
x y z
0 5 5.0 5
[22]:
df.dtypes
[22]:
x     object
y    float64
z      int64
dtype: object

To cast a column into the datetime.datetime type, use the parse_dates parameter.

[23]:
query = """
select
cast('2012-11-14 14:32:30' as TIMESTAMP) as x,
'2013-11-14 14:32:30.100121' as y,
'2012-11-14' as z
"""

df = gpl.load(
    source='query',
    destination='dataframe',
    query=query,
    parse_dates=['x', 'y', 'z'])

df
[23]:
x y z
0 2012-11-14 14:32:30+00:00 2013-11-14 14:32:30.100121 2012-11-14
[24]:
df.dtypes
[24]:
x    datetime64[ns, UTC]
y         datetime64[ns]
z         datetime64[ns]
dtype: object

cast data into BigQuery

This section describes the schema of the BigQuery table when destination = ‘bq’.

If source = ‘query’, the bq_schema argument is ignored. The bq_schema is inferred from the query.

Otherwise, one can use this argument to provide an explicit schema.

[25]:
df = pandas.DataFrame(data={'x': [7, 8], 'y': ['a', 'b']})

gpl.load(
    source='dataframe',
    destination='gs',
    data_name='a0',
    dataframe=df)


bq_schema = [bigquery.SchemaField(name='x', field_type='FLOAT'),
             bigquery.SchemaField(name='y', field_type='STRING')]

gpl.load(
    source='gs',
    destination='bq',
    data_name='a0',
    bq_schema=bq_schema)

table_ref = dataset_ref.table(table_id='a0')
table = bq_client.get_table(table_ref)
table.schema
[25]:
[SchemaField('x', 'FLOAT', 'NULLABLE', None, (), None),
 SchemaField('y', 'STRING', 'NULLABLE', None, (), None)]

If source is one of ‘gs’ or ‘local’ and the bq_schema is not passed, it falls back to an inferred value from the CSV with google.cloud.bigquery.job.LoadJobConfig.autodetect.

[26]:
df = pandas.DataFrame(data={'x': [7, 8], 'y': ['a', 'b']})

gpl.load(
    source='dataframe',
    destination='gs',
    data_name='a0',
    dataframe=df)

gpl.load(
    source='gs',
    destination='bq',
    data_name='a0')

table_ref = dataset_ref.table(table_id='a0')
table = bq_client.get_table(table_ref)
table.schema
[26]:
[SchemaField('x', 'INTEGER', 'NULLABLE', None, (), None),
 SchemaField('y', 'STRING', 'NULLABLE', None, (), None)]

If source = ‘dataframe’ and the bq_schema is not passed, it falls back to an inferred value from the dataframe with google_pandas_load.load_config.LoadConfig.bq_schema_inferred_from_dataframe.

[27]:
dt = datetime.strptime(
    '2003-11-14 14:32:30.100121',
    '%Y-%m-%d %H:%M:%S.%f')
df = pandas.DataFrame(
    data={
        'w': [8.0],
        'x': ['e'],
        'y': ['2018-01-01'],
        'z': [dt]})

gpl.load(
    source='dataframe',
    destination='bq',
    data_name='a0',
    dataframe=df,
    date_cols=['y'],
    timestamp_cols=['z'])

table_ref = dataset_ref.table(table_id='a0')
table = bq_client.get_table(table_ref)
table.schema
[27]:
[SchemaField('w', 'FLOAT', 'NULLABLE', None, (), None),
 SchemaField('x', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('y', 'DATE', 'NULLABLE', None, (), None),
 SchemaField('z', 'TIMESTAMP', 'NULLABLE', None, (), None)]

Multi load

[28]:
config1 = LoadConfig(
    source='query',
    destination='dataframe',
    query='select 1 as x')


df = pandas.DataFrame(data={'x': [3]})
config2 = LoadConfig(
    source='dataframe',
    destination='local',
    data_name='a0',
    dataframe=df)

load_results = gpl.mload(configs=[config1, config2])
[29]:
load_results[0]
[29]:
x
0 1
[30]:
print(load_results[1])
None

Monitoring

monitor a load job

[31]:
xload_result = gpl.xload(
    source='query',
    destination='dataframe',
    query='select 11 as x')
[32]:
xload_result.load_result
[32]:
x
0 11
[33]:
print(xload_result.data_name)
print(xload_result.duration)
print(xload_result.durations)
print(xload_result.query_cost)
20210413163210_099148_rand52396115734164180921241296642625583567
3
Namespace(bq_to_gs=2, dataframe_to_local=None, gs_to_bq=None, gs_to_local=0, local_to_dataframe=0, local_to_gs=None, query_to_bq=1)
0.0

monitor a multi load job

[34]:
config1 = LoadConfig(
    source='query',
    destination='dataframe',
    query='select 1 as x')


df = pandas.DataFrame(data={'x': [3]})
config2 = LoadConfig(
    source='dataframe',
    destination='local',
    data_name='a0',
    dataframe=df)

xmload_result = gpl.xmload(configs=[config1, config2])
[35]:
xmload_result.load_results
[35]:
[   x
 0  1,
 None]
[36]:
print(xmload_result.data_names)
print(xmload_result.duration)
print(xmload_result.durations)
print(xmload_result.query_cost)
print(xmload_result.query_costs)
['20210413163214_435066_rand249239221017440999351360016787218234830', 'a0']
4
Namespace(bq_to_gs=3, dataframe_to_local=0, gs_to_bq=None, gs_to_local=0, local_to_dataframe=0, local_to_gs=None, query_to_bq=1)
0.0
[0.0, None]

Logging

The logger creating google_pandas_load.Loader’s log records is named Loader and is controlled, as usual, by the application code.

[37]:
import logging
logger = logging.getLogger('Loader')
logger.setLevel(level=logging.DEBUG)
ch = logging.StreamHandler()
formatter = logging.Formatter(fmt='%(name)s - %(levelname)s - %(message)s')
ch.setFormatter(fmt=formatter)
logger.addHandler(hdlr=ch)
[38]:
df = gpl.load(
    source='query',
    destination='dataframe',
    query='select 1 as x')
Loader - DEBUG - Starting query to bq...
Loader - DEBUG - Ended query to bq [1s, 0.0$]
Loader - DEBUG - Starting bq to gs...
Loader - DEBUG - Ended bq to gs [2s]
Loader - DEBUG - Starting gs to local...
Loader - DEBUG - Ended gs to local [0s]
Loader - DEBUG - Starting local to dataframe...
Loader - DEBUG - Ended local to dataframe [0s]

The logger creating google_pandas_load.LoaderQuickSetup’s log records is named LoaderQuickSetup. Contrary to the logger Loader, it already has a built-in console handler. Therefore, without any logging set up, logging records are displayed in the console. This is convenient when working with notebooks.

[39]:
df = gpl_quick_setup.load(
    source='query',
    destination='dataframe',
    query='select 1 as x')
2021-04-13 16:32:24,369 - LoaderQuickSetup - DEBUG - Starting query to bq...
2021-04-13 16:32:26,132 - LoaderQuickSetup - DEBUG - Ended query to bq [1s, 0.0$]
2021-04-13 16:32:26,139 - LoaderQuickSetup - DEBUG - Starting bq to gs...
2021-04-13 16:32:27,990 - LoaderQuickSetup - DEBUG - Ended bq to gs [1s]
2021-04-13 16:32:27,993 - LoaderQuickSetup - DEBUG - Starting gs to local...
2021-04-13 16:32:28,467 - LoaderQuickSetup - DEBUG - Ended gs to local [0s]
2021-04-13 16:32:28,470 - LoaderQuickSetup - DEBUG - Starting local to dataframe...
2021-04-13 16:32:28,485 - LoaderQuickSetup - DEBUG - Ended local to dataframe [0s]

In order to avoid duplicate log records in the console, the LoaderQuickSetup logger is by default set to not propagate its log records to its logger ancestors.

Both google_pandas_load.Loader and google_pandas_load.LoaderQuickSetup have a logger parmeter. The default values are respectively the Loader logger and the LoaderQuickSetup logger. The parameters can, in both cases, be replaced by another logger.