Skip to main content

The community-supported Teradata package provides an integration with Teradata Vantage.

For more information, see the dagster-teradata GitHub repository.

dagster-teradata with Teradata Vantage

To begin integrating Dagster with Teradata Vantage for building and managing ETL pipelines, this guide provides step-by-step instructions on installing and configuring the required packages, setting up a Dagster project, and implementing a pipeline that interacts with Teradata Vantage.

Prerequisites

  • Access to a Teradata Vantage instance.

    note

    If you need a test instance of Vantage, you can provision one for free at https://clearscape.teradata.com

  • Python 3.9 or higher, Python 3.13 is recommended.

  • pip

Install dagster-teradata

With your virtual environment active, the next step is to install dagster and the Teradata provider package (dagster-teradata) to interact with Teradata Vantage.

  1. Install the Required Packages:

    uv add dagster-teradata
  2. Note about Optional Dependencies:

    a) dagster-teradata relies on dagster-aws for ingesting data from an S3 bucket into Teradata Vantage. Since dagster-aws is an optional dependency, users can install it by running:

    uv add dagster-teradata[aws]

    b) dagster-teradata also relies on dagster-azure for ingesting data from an Azure Blob Storage container into Teradata Vantage. To install this dependency, run:

    uv add dagster-teradata[azure]
  3. Verify the Installation:

    To confirm that Dagster is correctly installed, run:

    dagster –version

    If installed correctly, it should show the version of Dagster.

Initialize a Dagster Project

Now that you have the necessary packages installed, the next step is to create a new Dagster project.

Scaffold a New Dagster Project

Run the following command:

dagster project scaffold --name dagster-quickstart

This command will create a new project named dagster-quickstart. It will automatically generate the following directory structure:

dagster-quickstart
│ pyproject.toml
│ README.md
│ setup.cfg
│ setup.py

├───dagster_quickstart
│ assets.py
│ definitions.py
│ __init__.py

└───dagster_quickstart_tests
test_assets.py
__init__.py

Refer here to know more above this directory structure.

Create Sample Data

To simulate an ETL pipeline, create a CSV file with sample data that your pipeline will process.

Create the CSV File: Inside the dagster_quickstart/data/ directory, create a file named sample_data.csv with the following content:

id,name,age,city
1,Alice,28,New York
2,Bob,35,San Francisco
3,Charlie,42,Chicago
4,Diana,31,Los Angeles

This file represents sample data that will be used as input for your ETL pipeline.

Define Assets for the ETL Pipeline

Now, we'll define a series of assets for the ETL pipeline inside the assets.py file.

Edit the assets.py File: Open the dagster_quickstart/assets.py file and add the following code to define the pipeline:

import pandas as pd
from dagster import asset

@asset(required_resource_keys={"teradata"})
def read_csv_file(context):
df = pd.read_csv("dagster_quickstart/data/sample_data.csv")
context.log.info(df)
return df

@asset(required_resource_keys={"teradata"})
def drop_table(context):
result = context.resources.teradata.drop_table(["tmp_table"])
context.log.info(result)

@asset(required_resource_keys={"teradata"})
def create_table(context, drop_table):
result = context.resources.teradata.execute_query('''CREATE TABLE tmp_table (
id INTEGER,
name VARCHAR(50),
age INTEGER,
city VARCHAR(50));''')
context.log.info(result)

@asset(required_resource_keys={"teradata"}, deps=[read_csv_file])
def insert_rows(context, create_table, read_csv_file):
data_tuples = [tuple(row) for row in read_csv_file.to_numpy()]
for row in data_tuples:
result = context.resources.teradata.execute_query(
f"INSERT INTO tmp_table (id, name, age, city) VALUES ({row[0]}, '{row[1]}', {row[2]}, '{row[3]}');"
)
context.log.info(result)

@asset(required_resource_keys={"teradata"})
def read_table(context, insert_rows):
result = context.resources.teradata.execute_query("select * from tmp_table;", True)
context.log.info(result)

This Dagster pipeline defines a series of assets that interact with Teradata. It starts by reading data from a CSV file, then drops and recreates a table in Teradata. After that, it inserts rows from the CSV into the table and finally retrieves the data from the table.

Define the Pipeline Definitions

The next step is to configure the pipeline by defining the necessary resources and jobs.

Edit the definitions.py File: Open dagster_quickstart/definitions.py and define your Dagster pipeline as follows:

from dagster import EnvVar, Definitions
from dagster_teradata import TeradataResource

from .assets import read_csv_file, read_table, create_table, drop_table, insert_rows

# Define the pipeline and resources
defs = Definitions(
assets=[read_csv_file, read_table, create_table, drop_table, insert_rows],
resources={
"teradata": TeradataResource(
host=EnvVar("TERADATA_HOST"),
user=EnvVar("TERADATA_USER"),
password=EnvVar("TERADATA_PASSWORD"),
database=EnvVar("TERADATA_DATABASE"),
)
}
)

This code sets up a Dagster project that interacts with Teradata by defining assets and resources

  1. It imports necessary modules, including pandas, Dagster, and dagster-teradata.
  2. It imports asset functions (read_csv_file, read_table, create_table, drop_table, insert_rows) from the assets.py module.
  3. It registers these assets with Dagster using Definitions, allowing Dagster to track and execute them.
  4. It defines a Teradata resource (TeradataResource) that reads database connection details from environment variables (TERADATA_HOST, TERADATA_USER, TERADATA_PASSWORD, TERADATA_DATABASE).

Running the Pipeline

After setting up the project, you can now run your Dagster pipeline:

  1. Start the Dagster Dev Server: In your terminal, navigate to the root directory of your project and run: dagster dev After executing the command dagster dev, the Dagster logs will be displayed directly in the terminal. Any errors encountered during startup will also be logged here. Once you see a message similar to:

    2025-02-04 09:15:46 +0530 - dagster-webserver - INFO - Serving dagster-webserver on http://127.0.0.1:3000 in process 32564,

    It indicates that the Dagster webserver is running successfully. At this point, you can proceed to the next step.


  2. Access the Dagster UI: Open a web browser and navigate to http://127.0.0.1:3000. This will open the Dagster UI where you can manage and monitor your pipelines.


  3. Run the Pipeline:

  • In the top navigation of the Dagster UI, click Assets > View global asset lineage.
  • Click Materialize to execute the pipeline.
  • In the popup window, click View to see the details of the pipeline run.
  1. Monitor the Run: The Dagster UI allows you to visualize the pipeline's progress, view logs, and inspect the status of each step. You can switch between different views to see the execution logs and metadata for each asset.

Below are some of the operations provided by the TeradataResource:

1. Execute a Query (execute_query)

This operation executes a SQL query within Teradata Vantage.

Args:

  • sql (str) – The query to be executed.
  • fetch_results (bool, optional) – If True, fetch the query results. Defaults to False.
  • single_result_row (bool, optional) – If True, return only the first row of the result set. Effective only if fetch_results is True. Defaults to False.

2. Execute Multiple Queries (execute_queries)

This operation executes a series of SQL queries within Teradata Vantage.

Args:

  • sql_queries (Sequence[str]) – List of queries to be executed in series.
  • fetch_results (bool, optional) – If True, fetch the query results. Defaults to False.
  • single_result_row (bool, optional) – If True, return only the first row of the result set. Effective only if fetch_results is True. Defaults to False.

3. Drop a Database (drop_database)

This operation drops one or more databases from Teradata Vantage.

Args:

  • databases (Union[str, Sequence[str]]) – Database name or list of database names to drop.

4. Drop a Table (drop_table)

This operation drops one or more tables from Teradata Vantage.

Args:

  • tables (Union[str, Sequence[str]]) – Table name or list of table names to drop.

Data Transfer from AWS S3 to Teradata Vantage Using dagster-teradata:

import os

from dagster import job, op, Definitions, EnvVar, DagsterError
from dagster_aws.s3 import S3Resource, s3_resource
from dagster_teradata import TeradataResource, teradata_resource

s3_resource = S3Resource(
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
aws_session_token=os.getenv("AWS_SESSION_TOKEN"),
)

td_resource = TeradataResource(
host=os.getenv("TERADATA_HOST"),
user=os.getenv("TERADATA_USER"),
password=os.getenv("TERADATA_PASSWORD"),
database=os.getenv("TERADATA_DATABASE"),
)

@op(required_resource_keys={"teradata"})
def drop_existing_table(context):
context.resources.teradata.drop_table("people")
return "Tables Dropped"

@op(required_resource_keys={"teradata", "s3"})
def ingest_s3_to_teradata(context, status):
if status == "Tables Dropped":
context.resources.teradata.s3_to_teradata(s3_resource, os.getenv("AWS_S3_LOCATION"), "people")
else:
raise DagsterError("Tables not dropped")

@job(resource_defs={"teradata": td_resource, "s3": s3_resource})
def example_job():
ingest_s3_to_teradata(drop_existing_table())

defs = Definitions(
jobs=[example_job]
)

The s3_to_teradata method is used to load data from an S3 bucket into a Teradata table. It leverages Teradata Vantage Native Object Store (NOS), which allows direct querying and loading of external object store data (like AWS S3) into Teradata tables.

Arguments Supported by s3_blob_to_teradata

  • s3 (S3Resource): The S3Resource object used to interact with the S3 bucket.

  • s3_source_key (str): The URI specifying the location of the S3 bucket. The URI format is: /s3/YOUR-BUCKET.s3.amazonaws.com/YOUR-BUCKET-NAME For more details, refer to: Teradata Documentation - Native Object Store

  • teradata_table (str):
    The name of the Teradata table to which the data will be loaded.

  • public_bucket (bool):
    Indicates whether the provided S3 bucket is public. If True, the objects within the bucket can be accessed via a URL without authentication. If False, the bucket is considered private, and authentication must be provided.
    Defaults to False.

  • teradata_authorization_name (str):
    The name of the Teradata Authorization Database Object, which controls access to the S3 object store.
    For more details, refer to: Teradata Vantage Native Object Store - Setting Up Access


Data Transfer from Azure Blob to Teradata Vantage Using dagster-teradata:

import os

from dagster import job, op, Definitions, EnvVar, DagsterError
from dagster_azure.adls2 import ADLS2Resource, ADLS2SASToken
from dagster_teradata import TeradataResource, teradata_resource

azure_resource = ADLS2Resource(
storage_account="",
credential=ADLS2SASToken(token=""),
)

td_resource = TeradataResource(
host=os.getenv("TERADATA_HOST"),
user=os.getenv("TERADATA_USER"),
password=os.getenv("TERADATA_PASSWORD"),
database=os.getenv("TERADATA_DATABASE"),
)

@op(required_resource_keys={"teradata"})
def drop_existing_table(context):
context.resources.teradata.drop_table("people")
return "Tables Dropped"

@op(required_resource_keys={"teradata", "azure"})
def ingest_azure_to_teradata(context, status):
if status == "Tables Dropped":
context.resources.teradata.azure_blob_to_teradata(azure_resource, "/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/", "people", True)
else:
raise DagsterError("Tables not dropped")

@job(resource_defs={"teradata": td_resource, "azure": azure_resource})
def example_job():
ingest_azure_to_teradata(drop_existing_table())

defs = Definitions(
jobs=[example_job]
)

The azure_blob_to_teradata method is used to load data from Azure Data Lake Storage (ADLS) into a Teradata table. This method leverages Teradata Vantage Native Object Store (NOS) to directly query and load external object store data (such as Azure Blob Storage) into Teradata.

Arguments Supported by azure_blob_to_teradata

  • azure (ADLS2Resource):
    The ADLS2Resource object used to interact with the Azure Blob Storage.

  • blob_source_key (str):
    The URI specifying the location of the Azure Blob object. The format is: /az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION
    For more details, refer to the Teradata documentation:
    Teradata Documentation - Native Object Store

  • teradata_table (str):
    The name of the Teradata table where the data will be loaded.

  • public_bucket (bool, optional):
    Indicates whether the Azure Blob container is public. If True, the objects in the container can be accessed without authentication.
    Defaults to False.

  • teradata_authorization_name (str, optional):
    The name of the Teradata Authorization Database Object used to control access to the Azure Blob object store. This is required for secure access to private containers.
    Defaults to an empty string.
    For more details, refer to the documentation:
    Teradata Vantage Native Object Store - Setting Up Access

Transfer data from Private Blob Storage Container to Teradata instance

To successfully transfer data from a Private Blob Storage Container to a Teradata instance, the following prerequisites are necessary.

  • An Azure account. You can start with a free account.

  • Create an Azure storage account

  • Create a blob container under Azure storage account

  • Upload CSV/JSON/Parquest format files to blob container

  • Create a Teradata Authorization object with the Azure Blob Storage Account and the Account Secret Key

    CREATE AUTHORIZATION azure_authorization USER 'azuretestquickstart' PASSWORD 'AZURE_BLOB_ACCOUNT_SECRET_KEY'
    note

    Replace AZURE_BLOB_ACCOUNT_SECRET_KEY with Azure storage account azuretestquickstart access key


Manage VantageCloud Lake Compute Clusters with dagster-teradata:

from dagster import Definitions, DagsterError, op, materialize, job
from dagster_dbt import DbtCliResource
from dagster_teradata import teradata_resource, TeradataResource

from .assets import jaffle_shop_dbt_assets
from .project import jaffle_shop_project
from .schedules import schedules

@op(required_resource_keys={"teradata"})
def create_compute_cluster(context):
context.resources.teradata.create_teradata_compute_cluster(
"ShippingCG01",
"Shipping",
"STANDARD",
"TD_COMPUTE_MEDIUM",
"MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(1) INITIALLY_SUSPENDED('FALSE')",
)
return "Compute Cluster Created"

@op(required_resource_keys={"teradata", "dbt"})
def run_dbt(context, status):
if status == "Compute Cluster Created":
materialize(
[jaffle_shop_dbt_assets],
resources={
"dbt": DbtCliResource(project_dir=jaffle_shop_project)
}
)
return "DBT Run Completed"
else:
raise DagsterError("DBT Run Failed")

@op(required_resource_keys={"teradata"})
def drop_compute_cluster(context, status):
if status == "DBT Run Completed":
context.resources.teradata.drop_teradata_compute_cluster("ShippingCG01", "Shipping", True)
else:
raise DagsterError("DBT Run Failed")

@job(resource_defs={"teradata": teradata_resource, "dbt": DbtCliResource})
def example_job():
drop_compute_cluster(run_dbt(create_compute_cluster()))

defs = Definitions(
assets=[jaffle_shop_dbt_assets],
jobs=[example_job],
schedules=schedules,
resources={
"dbt": DbtCliResource(project_dir=jaffle_shop_project),
"teradata": TeradataResource(),
},
)

Teradata VantageCloud Lake provides robust compute cluster management capabilities, enabling users to dynamically allocate, suspend, resume, and delete compute resources. These operations are fully supported through dagster-teradata, allowing users to manage compute clusters directly within their Dagster pipelines. This integration ensures optimal performance, scalability, and cost efficiency. The following operations facilitate seamless compute cluster management within Dagster:

1. Create a Compute Cluster (create_teradata_compute_cluster)

This operation provisions a new compute cluster within Teradata VantageCloud Lake using dagster-teradata. It enables users to define the cluster's configuration, including compute profiles, resource allocation, and query execution strategies, directly within a Dagster job.

Args:

  • compute_profile_name (str) – Specifies the name of the compute profile.
  • compute_group_name (str) – Identifies the compute group to which the profile belongs.
  • query_strategy (str, optional, default="STANDARD") – Defines the method used by the Teradata Optimizer to execute SQL queries efficiently. Acceptable values:
    • STANDARD – The default strategy at the database level, optimized for general query execution.
    • ANALYTIC – Optimized for complex analytical workloads.
  • compute_map (Optional[str], default=None) – Maps compute resources to specific nodes within the cluster.
  • compute_attribute (Optional[str], default=None) – Specifies additional configuration attributes for the compute profile, such as:
    • MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(5) INITIALLY_SUSPENDED('FALSE')
  • timeout (int, optional, default=constants.CC_OPR_TIME_OUT) – The maximum duration (in seconds) to wait for the cluster creation process to complete. Default: 20 minutes.

2. Suspend a Compute Cluster (suspend_teradata_compute_cluster)

This operation temporarily suspends a compute cluster within Teradata VantageCloud Lake using dagster-teradata, reducing resource consumption while retaining the compute profile for future use.

Args:

  • compute_profile_name (str) – Specifies the name of the compute profile.
  • compute_group_name (str) – Identifies the compute group associated with the profile.
  • timeout (int, optional, default=constants.CC_OPR_TIME_OUT) – The maximum wait time for the suspension process to complete. Default: 20 minutes.

3. Resume a Compute Cluster (resume_teradata_compute_cluster)

This operation restores a previously suspended compute cluster using dagster-teradata, allowing workloads to resume execution within a Dagster pipeline.

Args:

  • compute_profile_name (str) – Specifies the name of the compute profile.
  • compute_group_name (str) – Identifies the compute group associated with the profile.
  • timeout (int, optional, default=constants.CC_OPR_TIME_OUT) – The maximum wait time for the resumption process to complete. Default: 20 minutes.

4. Delete a Compute Cluster (drop_teradata_compute_cluster)

This operation removes a compute cluster from Teradata VantageCloud Lake using dagster-teradata, with an option to delete the associated compute group. You can run this operation directly from your Dagster workflow.

Args:

  • compute_profile_name (str) – Specifies the name of the compute profile.
  • compute_group_name (str) – Identifies the compute group associated with the profile.
  • delete_compute_group (bool, optional, default=False) – Determines whether the compute group should be deleted:
    • True – Deletes the compute group.
    • False – Retains the compute group without modifications.

These operations are designed to be fully integrated into dagster-teradata for managing compute clusters in Teradata VantageCloud Lake. By utilizing these operations within Dagster jobs, users can optimize resource allocation, perform complex transformations, and automate compute cluster management to align with workload demands.


Teradata Operators Documentation

BTEQ Operator

The bteq_operator method enables execution of SQL statements or BTEQ (Basic Teradata Query) scripts using the Teradata BTEQ utility. It supports running commands either on the local machine or on a remote machine over SSH — in both cases, the BTEQ utility must be installed on the target system.

Key Features

  • Executes SQL provided as a string or from a script file (only one can be used at a time).
  • Supports custom encoding for the script or session.
  • Configurable timeout and return code handling.
  • Remote execution supports authentication using a password or an SSH key.
  • Works in both local and remote setups, provided the BTEQ tool is installed on the system where execution takes place.

Ensure that the Teradata BTEQ utility is installed on the machine where the SQL statements or scripts will be executed.

This could be:

  • The local machine where Dagster runs the task, for local execution.
  • The remote host accessed via SSH, for remote execution.
  • If executing remotely, also ensure that an SSH server (e.g., sshd) is running and accessible on the remote machine.

Parameters

  • sql: SQL statement(s) to be executed using BTEQ. (optional, mutually exclusive with file_path)
  • file_path: If provided, this file will be used instead of the sql content. This path represents remote file path when executing remotely via SSH, or local file path when executing locally. (optional, mutually exclusive with sql)
  • remote_host: Hostname or IP address for remote execution. If not provided, execution is assumed to be local. (optional)
  • remote_user: Username used for SSH authentication on the remote host. Required if remote_host is specified.
  • remote_password: Password for SSH authentication. Optional, and used as an alternative to ssh_key_path.
  • ssh_key_path: Path to the SSH private key used for authentication. Optional, and used as an alternative to remote_password.
  • remote_port: SSH port number for the remote host. Defaults to 22 if not specified. (optional)
  • remote_working_dir: Temporary directory location on the remote host (via SSH) where the BTEQ script will be transferred and executed. Defaults to /tmp if not specified. This is only applicable when remote_host is provided.
  • bteq_script_encoding: Character encoding for the BTEQ script file. Defaults to ASCII if not specified.
  • bteq_session_encoding: Character set encoding for the BTEQ session. Defaults to ASCII if not specified.
  • bteq_quit_rc: Accepts a single integer, list, or tuple of return codes. Specifies which BTEQ return codes should be treated as successful, allowing subsequent tasks to continue execution.
  • timeout: Timeout (in seconds) for executing the BTEQ command. Default is 600 seconds (10 minutes).
  • timeout_rc: Return code to use if the BTEQ execution fails due to a timeout. To allow Ops execution to continue after a timeout, include this value in bteq_quit_rc. If not specified, a timeout will raise an exception and stop the Ops.

Returns

  • Output of the BTEQ execution, or None if no output was produced.

Raises

  • ValueError: For invalid input or configuration
  • DagsterError: If BTEQ execution fails or times out

Notes

  • Either sql or file_path must be provided, but not both.
  • For remote execution, provide either remote_password or ssh_key_path (not both).
  • Encoding and timeout handling are customizable.
  • Validates remote port and authentication parameters.

Example Usage

# Local execution with direct SQL
output = bteq_operator(sql="SELECT * FROM table;")

# Remote execution with file
output = bteq_operator(
file_path="script.sql",
remote_host="example.com",
remote_user="user",
ssh_key_path="/path/to/key.pem"
)

DDL Operator

The ddl_operator method executes DDL (Data Definition Language) statements on Teradata using the Teradata Parallel Transporter (TPT). It supports both local and remote execution (via SSH), allowing you to manage Teradata schema objects seamlessly within Dagster pipelines.

Key Features

  • Executes one or more DDL statements in a single operation.
  • Supports both local and remote execution via SSH.
  • Handles custom error codes to allow continued workflow execution.
  • Allows configuration of remote working directory and SSH credentials.
  • Supports custom job naming for easier tracking and debugging.
  • Provides robust validation for input parameters and execution environment.

Ensure that the Teradata Parallel Transporter (TPT) is installed on the system where DDL execution will occur. This could be:

  • The local machine running Dagster (for local execution).
  • A remote host accessible via SSH (for remote execution).
  • For remote setups, ensure an SSH server (e.g., sshd) is active and reachable.

Parameters

ParameterTypeDescriptionRequiredDefault
ddllist[str]List of DDL statements to be executed. Each entry must be a valid SQL string.
error_listint | List[int]Single integer or list of integers representing error codes to ignore.
remote_working_dirstrDirectory on the remote host used as a working directory for temporary DDL files or logs./tmp
ddl_job_namestrA descriptive name for the DDL job to assist in tracking and logging.
remote_hoststrHostname or IP address of the remote machine where the DDL should run. If not specified, execution is local.
remote_userstrSSH username for authentication on the remote host. Required if remote_host is provided.
remote_passwordstrPassword for SSH authentication. Used as an alternative to ssh_key_path.
ssh_key_pathstrPath to the SSH private key file used for authentication. Used as an alternative to remote_password.
remote_portintSSH port for the remote host. Must be within range 1–65535.22

Returns

  • Optional[int] — Return code from the DDL execution.
    Returns None if no return code is applicable or produced.

Raises

  • ValueError — If input validation fails (e.g., missing DDL list, invalid SSH configuration, or bad port number).
  • Exception — If the DDL execution fails and the resulting error code is not part of the error_list.

Notes

  • The ddl parameter must be a non-empty list of valid SQL DDL statements.
  • For remote execution, either remote_password or ssh_key_path must be provided (but not both).
  • Invalid or missing SSH configuration parameters will raise validation errors before execution.
  • Ensures strong validation for parameters like SSH port and DDL syntax structure.

Example Usage

Local Execution with Multiple DDL Statements

return_code = ddl_operator(
ddl=[
"CREATE TABLE employees (id INT, name VARCHAR(100));",
"CREATE INDEX idx_name ON employees(name);"
]
)

Remote Execution Using SSH Key Authentication

return_code = ddl_operator(
ddl=["DROP TABLE IF EXISTS sales;"],
remote_host="td-server.example.com",
remote_user="td_admin",
ssh_key_path="/home/td_admin/.ssh/id_rsa",
ddl_job_name="drop_sales_table"
)

Remote Execution Using Password Authentication with Ignored Error Codes

return_code = ddl_operator(
ddl=["CREATE DATABASE reporting;"],
remote_host="192.168.1.100",
remote_user="teradata",
remote_password="password123",
error_list=[3807], # Ignore 'database already exists' error
ddl_job_name="create_reporting_db"
)

Integration with Dagster

from dagster import asset
from dagster_teradata import DdlOperator

@asset
def create_tables(context):
context.resources.teradata.ddl_operator(
ddl=[
"CREATE TABLE sales (id INTEGER, amount DECIMAL(10,2));",
"CREATE TABLE customers (cust_id INTEGER, name VARCHAR(100));"
]
)

TDLoad Operator

The tdload_operator method enables execution of TDLoad jobs for transferring data between Teradata tables or between Teradata and external files.
It provides a unified interface to run TDLoad operations either locally or remotely via SSH, allowing data ingestion and export tasks to be seamlessly integrated within Dagster pipelines.

Key Features

  • Executes TDLoad operations for data import/export between Teradata and external files.
  • Supports both local and remote execution via SSH.
  • Allows custom job configuration, variable files, and TDLoad command options.
  • Handles multiple data source and target formats (e.g., CSV, TEXT, PARQUET).
  • Supports text delimiters for structured data handling.
  • Provides strong validation for remote execution parameters and ports.

Ensure that the TDLoad utility is installed on the machine where execution will occur.

This could be:

  • The local machine running Dagster (for local execution).
  • A remote host accessible via SSH (for remote execution).
  • For remote setups, ensure that an SSH server (e.g., sshd) is active and reachable.

Parameters

ParameterTypeDescriptionRequiredDefault
source_tablestrName of the source table in Teradata.
select_stmtstrSQL SELECT statement for extracting data.
insert_stmtstrSQL INSERT statement for loading data.
target_tablestrName of the target table in Teradata.
source_file_namestrSource file path for input data.
target_file_namestrTarget file path for output data.
source_formatstrFormat of the source file (e.g., CSV, TEXT).
source_text_delimiterstrField delimiter for the source file.
target_formatstrFormat of the target file (e.g., CSV, TEXT).
target_text_delimiterstrField delimiter for the target file.
tdload_optionsstrAdditional TDLoad options to customize execution.
tdload_job_namestrName assigned to the TDLoad job.
tdload_job_var_filestrPath to the TDLoad job variable file.
remote_working_dirstrDirectory on remote host for temporary TDLoad job files./tmp
remote_hoststrHostname or IP of the remote machine for execution.
remote_userstrUsername for SSH authentication on the remote host. Required if remote_host is specified.
remote_passwordstrPassword for SSH authentication. Alternative to ssh_key_path.
ssh_key_pathstrPath to SSH private key used for authentication. Alternative to remote_password.
remote_portintSSH port for the remote connection (1–65535).22

Return Value

  • int | None — Return code from the TDLoad operation.
    Returns None if no return code is applicable or produced.

Raises

  • ValueError — For invalid parameter combinations (e.g., both password and key provided, invalid port, or missing SSH credentials).
  • Exception — If the TDLoad execution fails for reasons not covered by user configuration.

Notes

  • Either a source_table/select_stmt or a source_file_name must be provided as the input source.
  • Either a target_table/insert_stmt or a target_file_name must be provided as the target.
  • For remote execution, only one authentication method can be used — either remote_password or ssh_key_path.
  • The remote_port must be a valid port number between 1 and 65535.
  • The tdload_job_name and tdload_job_var_file can help manage reusable or parameterized load operations.

Examples

1. File to Table

Load data from a file into a Teradata table.

@asset
def file_to_table_load(context):
context.resources.teradata.tdload_operator(
source_file_name="/data/customers.csv",
target_table="customers",
source_format="Delimited",
source_text_delimiter="|"
)

2. Table to File

Export data from a Teradata table to a file.

@asset
def table_to_file_export(context):
context.resources.teradata.tdload_operator(
source_table="sales",
target_file_name="/data/sales_export.csv",
target_format="Delimited",
target_text_delimiter=","
)

3. Table to Table

Transfer data between Teradata tables.

@asset
def table_to_table_transfer(context):
context.resources.teradata.tdload_operator(
source_table="staging_sales",
target_table="prod_sales",
insert_stmt="INSERT INTO prod_sales SELECT * FROM staging_sales WHERE amount > 1000"
)

4. Custom SELECT to File

Export data using a custom SQL query.

@asset
def custom_export(context):
context.resources.teradata.tdload_operator(
select_stmt="SELECT customer_id, SUM(amount) FROM sales GROUP BY customer_id",
target_file_name="/data/customer_totals.csv"
)

5. Using Job Variable Files

@asset
def use_job_var_file(context):
context.resources.teradata.tdload_operator(
tdload_job_var_file="/config/load_job_vars.txt",
tdload_options="-j my_load_job"
)

6. Remote Execution with SSH Key

@asset
def remote_tpt_operation(context):
context.resources.teradata.tdload_operator(
source_file_name="/remote/data/input.csv",
target_table="remote_table",
remote_host="td-prod.company.com",
remote_user="tdadmin",
ssh_key_path="/home/user/.ssh/td_key",
remote_working_dir="/tmp/tpt_work"
)

7. Custom TPT Options

@asset
def custom_tpt_options(context):
context.resources.teradata.tdload_operator(
source_table="large_table",
target_file_name="/data/export.csv",
tdload_options="-f CSV -m 4 -s ,", # Format: CSV, 4 streams, comma separator
tdload_job_name="custom_export_job"
)

Error Handling

Both operators include comprehensive error handling:

  • Connection errors: Automatic retry and cleanup
  • SSH failures: Graceful degradation with detailed logging
  • TPT execution errors: Proper exit code handling and error messages
  • Resource cleanup: Automatic cleanup of temporary files

Best Practices

  • Use error_list for idempotent DDL operations
  • Specify remote_working_dir for SSH operations
  • Use meaningful job names for monitoring and debugging
  • Test with small datasets before scaling up
  • Monitor TPT job logs for performance optimization

Further reading