Code examples
Learn how to use the DAP Client Library.
DAP client library is following the asynchronous programming paradigm, and makes use of the new Python keywords async
and await
. The examples below have to be executed in an asynchronous context. You can enter an asynchronous context by invoking asyncio.run
. By default, a Python script runs in a synchronous context; you must wrap the examples below into an async
function, or you will get a syntax error.
Initiate library
First, we need to instantiate the DAPClient
class:
However, DAPClient
can automatically extract the value of these parameters from the above environment variables, allowing us to write:
Note that DAPClient
uses an asynchronous context manager. Keywords such as async with
are permitted only in an asynchronous context. We can enter such a context by invoking asyncio.run(my_function(arg1, arg2, ...))
.
Let’s explore a few common use cases with DAPClient
.
Obtaining the latest schema
Before we obtain data, we need to get the latest schema of a table. The following example retrieves the JSON schema of the table accounts
in the namespace canvas
as a JSON schema object. A JSON object is a recursive Python data structure whose outermost layer is a Python dict
whose keys are strings (type str
) and values are JSON objects. We can use the Python package jsonschema to validate data against this JSON schema.
We can also save the schema to a file.
Fetching table data with a snapshot query
In order to get an initial copy of the full table contents, we need to perform a snapshot query. The parameter format
determines the output data format, including CSV, TSV, JSONL and Parquet. We recommend JSONL or Parquet. For JSONL, each line in the output can be parsed into a JSON object, conforming to the JSON schema returned above.
Getting latest changes with an incremental query
Once an initial snapshot has been obtained, we need to keep the data synchronized with DAP. This is possible with incremental queries. The following, more complex example gets all changes since a specified since
timestamp, and saves each data file on the server to an output file in the local filesystem. The last_seen
timestamp is typically the until
returned by a previous incremental query.
Replicating data to a database
Earlier sections have shown how to obtain the latest schema, fetch data with a snapshot query, or get the latest changes with an incremental query. These are low-level operations that give you full control over what you do with the data.
However, in most cases we want high-level operations that ensure our database (either running locally or in the cloud) is synchronized with the data in DAP, without paying attention to specifics of data transfer. This is possible with two operations that
initialize a database, and
synchronize a database with the data in DAP.
In order to replicate data in DAP locally, we must first initialize a database:
Initialization creates a database schema for the DAP namespace, and a corresponding database table for each DAP table. In addition, it creates a meta-table, which is a special database table that holds synchronization information, e.g. the last time the data was synchronized with DAP, and the schema version that the locally stored data conforms to. Finally, it issues a snapshot query to DAP API, and populates the database table with output returned by the snapshot query.
Synchronizing data with a database
Once the table has been initialized, it can be kept up to date using the synchronize operation:
This inspects the information in the meta-table, and issues an incremental query to DAP API with a since
timestamp corresponding to the last synchronization time. Based on the results of the incremental query, it inserts new records, updates existing records, and deletes records that have been added to, updated in, or removed from the DAP service.
If the local schema version in the meta-table is identical to the remote schema version in DAP, inserting, updating and deleting records proceeds normally. However, if there is a mismatch, the table structure of the local database has to evolve to match the current structure of the data in DAP. This includes the following schema changes in the back-end:
A new required (a.k.a. non-nullable) field (column) is added. The new field has a default value assigned to it in the schema.
A new optional (a.k.a. nullable) field (column) is added to a table.
A new enumeration value is added to an existing enumeration type.
A new enumeration type is introduced.
A field (column) is removed from a table.
Behind the scenes, the client library uses SQL commands such as ALTER TABLE ... ADD COLUMN ...
or ALTER TYPE ... ADD VALUE ...
to replicate schema changes in DAP in our local database. If the JSON schema change couldn’t be mapped to a series of these SQL statements, the client library wouldn’t be able to synchronize with DAP using incremental queries, and would have to issue an expensive snapshot query.
Once the local database table structure has been reconciled with the new schema in DAP, and the meta-table has been updated, data synchronization proceeds normally with insert, update and delete SQL statements.
Dropping data from a database
If the table is no longer needed, it can be dropped from the database using the following code:
Configure log level for debugging
The client library uses the Python logging module to log messages. The default log level is INFO
. You can change the log level by adding the following code to the beginning of your script. It’ll also add the timestamp to each log message.
Last updated