Readers Source and Filesystem
This verified source easily streams files from AWS S3, Google Cloud Storage, Google Drive, Azure, or local filesystem using the reader source.
Sources and resources that can be used with this verified source are:
Name | Type | Description |
---|---|---|
readers | Source | Lists and reads files with resource filesystem and readers transformers |
filesystem | Resource | Lists files in bucket_url using file_glob pattern |
read_csv | Resource-transformer | Reads csv file with Pandas chunk by chunk |
read_jsonl | Resource-transformer | Reads jsonl file content and extract the data |
read_parquet | Resource-transformer | Reads parquet file content and extract the data with Pyarrow |
Setup Guide
Grab credentials
This source can access various bucket types, including:
- AWS S3.
- Google Cloud Storage.
- Google Drive.
- Azure Blob Storage.
- Local Storage
To access these, you'll need secret credentials:
AWS S3 credentials
To get AWS keys for S3 access:
- Access IAM in AWS Console.
- Select "Users", choose a user, and open "Security credentials".
- Click "Create access key" for AWS ID and Secret Key.
For more info, see AWS official documentation.
Google Cloud Storage / Google Drive credentials
To get GCS/GDrive access:
- Log in to console.cloud.google.com.
- Create a service account.
- Enable "Cloud Storage API" / "Google Drive API"; see Google's guide.
- In IAM & Admin > Service Accounts, find your account, click the three-dot menu > "Manage Keys" > "ADD KEY" > "CREATE" to get a JSON credential file.
- Grant the service account appropriate permissions for cloud storage access.
For more info, see how to create service account.
Azure Blob Storage credentials
To obtain Azure blob storage access:
- Go to Azure Portal (portal.azure.com).
- Select "Storage accounts" > your storage.
- Click "Settings" > "Access keys".
- View account name and two keys (primary/secondary). Keep keys confidential.
For more info, see Azure official documentation.
Initialize the verified source
To get started with your data pipeline, follow these steps:
Enter the following command:
dlt init filesystem duckdb
This command will initialize the pipeline example with filesystem as the source and duckdb as the destination.
If you'd like to use a different destination, simply replace
duckdb
with the name of your preferred destination.After running this command, a new directory will be created with the necessary files and configuration settings to get started.
For more information, read the Walkthrough: Add a verified source.
Add credentials
In the
.dlt
folder, there's a file calledsecrets.toml
. It's where you store sensitive information securely, like access tokens. Keep this file safe. Here's its format for service account authentication:[sources.filesystem.credentials] # use [sources.readers.credentials] for the "readers" source
# For AWS S3 access:
aws_access_key_id="Please set me up!"
aws_secret_access_key="Please set me up!"
# For GCS bucket / Google Drive access:
client_email="Please set me up!"
private_key="Please set me up!"
project_id="Please set me up!"
# For Azure blob storage access:
azure_storage_account_name="Please set me up!"
azure_storage_account_key="Please set me up!"Finally, enter credentials for your chosen destination as per the docs.
You can pass the bucket URL and glob pattern or use
config.toml
. For local filesystems, usefile://
as follows:[sources.filesystem] # use [sources.readers.credentials] for the "readers" source
bucket_url='file://Users/admin/Documents/csv_files'
file_glob="*"or skip the schema and provide the local path in a format native for your operating system as follows:
[sources.filesystem] # use [sources.readers.credentials] for the "readers" source
bucket_url='~\Documents\csv_files\'
file_glob="*"In the example above we use Windows path to current user's Documents folder. Mind that literal toml string (single quotes) was used to conveniently use the backslashes without need to escape.
For remote file systems you need to add the schema, it will be used to get the protocol being used. The protocols that can be used are:
For Azure blob storage
[sources.filesystem] # use [sources.readers.credentials] for the "readers" source
bucket_url="az://<container_name>/<path_to_files>/"az://
indicates the Azure Blob Storage protocol.container_name
is the name of the container.path_to_files/
is a directory path within the container.CAUTION: For Azure, use adlfs>=2023.9.0. Older versions mishandle globs.
For Google Drive
[sources.filesystem] # use [sources.readers.credentials] for the "readers" source
bucket_url="gdrive://<folder_name>/<subfolder_or_file_path>/"gdrive://
indicates that the Google Drive protocol.folder_name
refers to a folder within Google Drive.subfolder_or_file_path/
is a sub-folder or directory path within the my-bucket folder.
For Google Storage
[sources.filesystem] # use [sources.readers.credentials] for the "readers" source
bucket_url="gs://<bucket_name>/<path_to_files>/"gs://
indicates the Google Cloud Storage protocol.bucket_name
is the name of the bucket.path_to_files/
is a directory path within the bucket.
For AWS S3
[sources.filesystem] # use [sources.readers.credentials] for the "readers" source
bucket_url="s3://<bucket_name>/<path_to_files>/"s3://
indicates the AWS S3 protocol.bucket_name
is the name of the bucket.path_to_files/
is a directory path within the bucket.
Use local file system paths
You can use both native local file system paths and in form of file:
uri. Absolute, relative and UNC Windows paths are supported.
You can find relevant examples in filesystem destination documentation which follows
the same rules to specify the bucket_url
.
Windows supports paths up to 255 characters. When you access a path longer than 255 characters you'll see FileNotFound
exception.
To go over this limit you can use extended paths. Note that Python glob does not work with extended UNC paths so you will not be able to use them
[sources.filesystem]
bucket_url = '\\?\C:\a\b\c'
Run the pipeline
Before running the pipeline, ensure that you have installed all the necessary dependencies by running the command:
pip install -r requirements.txt
Install optional modules:
- For AWS S3:
pip install s3fs
- For Azure blob:
pip install adlfs>=2023.9.0
- GCS storage: No separate module needed.
- For AWS S3:
You're now ready to run the pipeline! To get started, run the following command:
python filesystem_pipeline.py
Once the pipeline has finished running, you can verify that everything loaded correctly by using the following command:
dlt pipeline <pipeline_name> show
For example, the
pipeline_name
for the above pipeline example isstandard_filesystem
, you may also use any custom name instead.
For more information, read the Walkthrough: Run a pipeline.
Sources and resources
dlt
works on the principle of sources and
resources.
Source readers
This source offers chunked file readers as resources, which can be optionally customized. Provided resources include:
read_csv()
read_jsonl()
read_parquet()
@dlt.source(_impl_cls=ReadersSource, spec=FilesystemConfigurationResource)
def readers(
bucket_url: str = dlt.secrets.value,
credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value,
file_glob: Optional[str] = "*",
) -> Tuple[DltResource, ...]:
...
bucket_url
: The url to the bucket.credentials
: The credentials to the filesystem of fsspecAbstractFilesystem
instance.file_glob
: Glob filter for files. Defaults to non-recursive listing in the bucket.
We advise that you give each resource a
specific name
before loading with pipeline.run
. This will make sure that data goes to a table with the name you
want and that each pipeline uses a
separate state for incremental loading.
Resource filesystem
This resource lists files in bucket_url
based on the file_glob
pattern, returning them as
FileItem
with data access methods. These can be paired with transformers for enhanced processing.
@dlt.resource(
primary_key="file_url", spec=FilesystemConfigurationResource, standalone=True
)
def filesystem(
bucket_url: str = dlt.secrets.value,
credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value,
file_glob: Optional[str] = "*",
files_per_page: int = DEFAULT_CHUNK_SIZE,
extract_content: bool = False,
) -> Iterator[List[FileItem]]:
...
bucket_url
: URL of the bucket.credentials
: Filesystem credentials ofAbstractFilesystem
instance.file_glob
: File filter in glob format. Defaults to listing all non-recursive files in bucket URL.files_per_page
: Number of files processed at once. Default: 100.extract_content
: If true, the content of the file will be read and returned in the resource. Default: False.
Filesystem Integration and Data Extraction Guide
Filesystem Usage
The filesystem tool enumerates files in a selected bucket using a glob pattern, returning details as FileInfo in customizable page sizes.
This resource integrates with transform functions and transformers for customized extraction pipelines.
To load data into a specific table (instead of the default filesystem table), see the snippet below:
@dlt.transformer(standalone=True)
def read_csv(items, chunksize: int = 15):
"""Reads csv file with Pandas chunk by chunk."""
...
# list only the *.csv in specific folder and pass the file items to read_csv()
met_files = (
filesystem(bucket_url="s3://my_bucket/data", file_glob="csv_folder/*.csv")
| read_csv()
)
# load to met_csv table using with_name()
pipeline.run(met_files.with_name("csv_data"))
Use the standalone filesystem resource to list files in s3, GCS, and Azure buckets. This allows you to customize file readers or manage files using fsspec.
files = filesystem(bucket_url="s3://my_bucket/data", file_glob="csv_folder/*.csv")
pipeline.run(files)
The filesystem ensures consistent file representation across bucket types and offers methods to access and read data. You can quickly build pipelines to:
- Extract text from PDFs.
- Stream large file content directly from buckets.
- Copy files locally.
FileItem
Representation
- All dlt sources/resources that yield files follow the FileItem contract.
- File content is typically not loaded; instead, full file info and methods to access content are available.
- Users can request an authenticated fsspec AbstractFileSystem instance.
FileItem
Fields:
file_url
- Complete URL of the file; also the primary key (e.g.s3://bucket-name/path/file
).file_name
- Name of the file from the bucket URL.relative_path
- Set when doingglob
, is a relative path to abucket_url
argument.mime_type
- File's mime type; sourced from the bucket provider or inferred from its extension.modification_date
- File's last modification time (format:pendulum.DateTime
).size_in_bytes
- File size.file_content
- Content, provided upon request.
When using a nested or recursive glob pattern, relative_path
will include the file's path relative to bucket_url
. For
instance, using the resource:
filesystem("az://dlt-ci-test-bucket/standard_source/samples", file_glob="met_csv/A801/*.csv")
will produce file names relative to the /standard_source/samples
path, such as
met_csv/A801/A881_20230920.csv
. For local filesystems, POSIX paths (using "/" as separator) are returned.
File Manipulation
FileItem, backed by a dictionary implementation, offers these helper methods:
read_bytes()
: Returns the file content as bytes.open()
: Provides a file object when opened.filesystem
: Gives access to an authorizedAbstractFilesystem
with standard fsspec methods.
Customization
Create your own pipeline
If you wish to create your own pipelines, you can leverage source and resource methods from this verified source.
Configure the pipeline by specifying the pipeline name, destination, and dataset as follows:
pipeline = dlt.pipeline(
pipeline_name="standard_filesystem", # Use a custom name if desired
destination="duckdb", # Choose the appropriate destination (e.g., duckdb, redshift, post)
dataset_name="filesystem_data_csv" # Use a custom name if desired
)To read and load CSV files:
BUCKET_URL = "YOUR_BUCKET_PATH_HERE" # path of the bucket url or local destination
met_files = readers(
bucket_url=BUCKET_URL, file_glob="directory/*.csv"
).read_csv()
# tell dlt to merge on date
met_files.apply_hints(write_disposition="merge", merge_key="date")
# We load the data into the met_csv table
load_info = pipeline.run(met_files.with_name("table_name"))
print(load_info)
print(pipeline.last_trace.last_normalize_info)The
file_glob
parameter targets all CSVs in the "met_csv/A801" directory.The
print(pipeline.last_trace.last_normalize_info)
line displays the data normalization details from the pipeline's last trace.infoIf you have a default bucket URL set in
.dlt/config.toml
, you can omit thebucket_url
parameter.
To load only new CSV files with incremental loading:
# This configuration will only consider new csv files
new_files = filesystem(bucket_url=BUCKET_URL, file_glob="directory/*.csv")
# add incremental on modification time
new_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))
load_info = pipeline.run((new_files | read_csv()).with_name("csv_files"))
print(load_info)
print(pipeline.last_trace.last_normalize_info)To read and load Parquet and JSONL from a bucket:
jsonl_reader = readers(BUCKET_URL, file_glob="**/*.jsonl").read_jsonl(
chunksize=10000
)
# PARQUET reading
parquet_reader = readers(BUCKET_URL, file_glob="**/*.parquet").read_parquet()
# load both folders together to specified tables
load_info = pipeline.run(
[
jsonl_reader.with_name("jsonl_data"),
parquet_reader.with_name("parquet_data"),
]
)
print(load_info)
print(pipeline.last_trace.last_normalize_info)- The
file_glob
: Specifies file pattern; reads all JSONL and Parquet files across directories. - The
chunksize
: Set to 10,000; data read in chunks of 10,000 records each. print(pipeline.last_trace.last_normalize_info)
: Displays the data normalization details from the pipeline's last trace.
- The
To set up a pipeline that reads from an Excel file using a standalone transformer:
# Define a standalone transformer to read data from an Excel file.
@dlt.transformer(standalone=True)
def read_excel(
items: Iterator[FileItemDict], sheet_name: str
) -> Iterator[TDataItems]:
# Import the required pandas library.
import pandas as pd
# Iterate through each file item.
for file_obj in items:
# Open the file object.
with file_obj.open() as file:
# Read from the Excel file and yield its content as dictionary records.
yield pd.read_excel(file, sheet_name).to_dict(orient="records")
# Set up the pipeline to fetch a specific Excel file from a filesystem (bucket).
example_xls = filesystem(
bucket_url=BUCKET_URL, file_glob="../directory/example.xlsx"
) | read_excel("example_table") # Pass the data through the transformer to read the "example_table" sheet.
# Execute the pipeline and load the extracted data into the "duckdb" destination.
load_info = dlt.run(
example_xls.with_name("example_xls_data"),
destination="duckdb",
dataset_name="example_xls_data",
)
# Print the loading information.
print(load_info)The code loads data from
example.xlsx
into theduckdb
destination.To copy files locally, add a step in the filesystem resource and then load the listing to the database:
def _copy(item: FileItemDict) -> FileItemDict:
# instantiate fsspec and copy file
dest_file = os.path.join(local_folder, item["file_name"])
# create dest folder
os.makedirs(os.path.dirname(dest_file), exist_ok=True)
# download file
item.fsspec.download(item["file_url"], dest_file)
# return file item unchanged
return item
# use recursive glob pattern and add file copy step
downloader = filesystem(BUCKET_URL, file_glob="**").add_map(_copy)
# NOTE: you do not need to load any data to execute extract, below we obtain
# a list of files in a bucket and also copy them locally
listing = list(downloader)
print(listing)
# download to table "listing"
load_info = pipeline.run(
downloader.with_name("listing"), write_disposition="replace"
)
# pretty print the information on data that was loaded
print(load_info)
print(listing)
print(pipeline.last_trace.last_normalize_info)Cleanup after loading:
You can get a fsspec client from filesystem resource after it was extracted i.e. in order to delete processed files etc. The filesystem module contains a convenient method
fsspec_from_resource
that can be used as follows:from filesystem import filesystem, fsspec_from_resource
# get filesystem source
gs_resource = filesystem("gs://ci-test-bucket/")
# extract files
pipeline.run(gs_resource | read_csv)
# get fs client
fs_client = fsspec_from_resource(gs_resource)
# do any operation
fs_client.ls("ci-test-bucket/standard_source/samples")
Additional Setup guides
- Load data from IFTTT to Google Cloud Storage in python with dlt
- Load data from IFTTT to The Local Filesystem in python with dlt
- Load data from Google Cloud Storage to AWS S3 in python with dlt
- Load data from Star Trek to The Local Filesystem in python with dlt
- Load data from X to AWS S3 in python with dlt
- Load data from SAP HANA to AWS S3 in python with dlt
- Load data from AWS S3 to AlloyDB in python with dlt
- Load data from AWS S3 to PostgreSQL in python with dlt
- Load data from Crypt API to Azure Cloud Storage in python with dlt
- Load data from SAP HANA to The Local Filesystem in python with dlt