Custom destination with BigQuery
info
The source code for this example can be found in our repository at: https://github.com/dlt-hub/dlt/tree/devel/docs/examples/custom_destination_bigquery
About this Example
In this example, you'll find a Python script that demonstrates how to load to BigQuery with the custom destination.
We'll learn how to:
- Use built-in credentials.
- Use the custom destination.
- Use pyarrow tables to create nested column types on BigQuery.
- Use BigQuery autodetect=Truefor schema inference from parquet files.
Full source code
import dlt
import pandas as pd
import pyarrow as pa
from google.cloud import bigquery
from dlt.common.configuration.specs import GcpServiceAccountCredentials
# constants
OWID_DISASTERS_URL = (
    "https://raw.githubusercontent.com/owid/owid-datasets/master/datasets/"
    "Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020)/"
    "Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020).csv"
)
# this table needs to be manually created in your gc account
# format: "your-project.your_dataset.your_table"
BIGQUERY_TABLE_ID = "chat-analytics-rasa-ci.ci_streaming_insert.natural-disasters"
# dlt sources
@dlt.resource(name="natural_disasters")
def resource(url: str):
    # load pyarrow table with pandas
    table = pa.Table.from_pandas(pd.read_csv(url))
    # we add a list type column to demonstrate bigquery lists
    table = table.append_column(
        "tags",
        pa.array(
            [["disasters", "earthquakes", "floods", "tsunamis"]] * len(table),
            pa.list_(pa.string()),
        ),
    )
    # we add a struct type column to demonstrate bigquery structs
    table = table.append_column(
        "meta",
        pa.array(
            [{"loaded_by": "dlt"}] * len(table),
            pa.struct([("loaded_by", pa.string())]),
        ),
    )
    yield table
# dlt bigquery custom destination
# we can use the dlt provided credentials class
# to retrieve the gcp credentials from the secrets
@dlt.destination(
    name="bigquery", loader_file_format="parquet", batch_size=0, naming_convention="snake_case"
)
def bigquery_insert(
    items, table=BIGQUERY_TABLE_ID, credentials: GcpServiceAccountCredentials = dlt.secrets.value
) -> None:
    client = bigquery.Client(
        credentials.project_id, credentials.to_native_credentials(), location="US"
    )
    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        source_format=bigquery.SourceFormat.PARQUET,
        schema_update_options=bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
    )
    # since we have set the batch_size to 0, we get a filepath and can load the file directly
    with open(items, "rb") as f:
        load_job = client.load_table_from_file(f, table, job_config=job_config)
    load_job.result()  # Waits for the job to complete.
if __name__ == "__main__":
    # run the pipeline and print load results
    pipeline = dlt.pipeline(
        pipeline_name="csv_to_bigquery_insert",
        destination=bigquery_insert,
        dataset_name="mydata",
        dev_mode=True,
    )
    load_info = pipeline.run(resource(url=OWID_DISASTERS_URL))
    print(load_info)