commit 5ae287424a6da494ad1b71658907c39ff14be650 Author: Zhongwei Li Date: Sun Nov 30 09:02:56 2025 +0800 Initial commit diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json new file mode 100644 index 0000000..fc6b4a3 --- /dev/null +++ b/.claude-plugin/plugin.json @@ -0,0 +1,13 @@ +{ + "name": "sdk-skills", + "description": "Skills for using Treasure Data SDKs including JavaScript SDK for browser-based event tracking and pytd (Python SDK) for querying data, importing pandas DataFrames, and building analytical pipelines", + "version": "0.0.0-2025.11.28", + "author": { + "name": "Treasure Data", + "email": "support@treasuredata.com" + }, + "skills": [ + "./skills/javascript", + "./skills/python" + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..f7f36e6 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# sdk-skills + +Skills for using Treasure Data SDKs including JavaScript SDK for browser-based event tracking and pytd (Python SDK) for querying data, importing pandas DataFrames, and building analytical pipelines diff --git a/plugin.lock.json b/plugin.lock.json new file mode 100644 index 0000000..5508066 --- /dev/null +++ b/plugin.lock.json @@ -0,0 +1,48 @@ +{ + "$schema": "internal://schemas/plugin.lock.v1.json", + "pluginId": "gh:treasure-data/td-skills:sdk-skills", + "normalized": { + "repo": null, + "ref": "refs/tags/v20251128.0", + "commit": "e0c11766a95ad8f5fbf9df27ab4f0a9ab68985b3", + "treeHash": "c548802f4867e35e36431653df8a2b46750d6364492c7713d2be881faba1086b", + "generatedAt": "2025-11-28T10:28:46.415607Z", + "toolVersion": "publish_plugins.py@0.2.0" + }, + "origin": { + "remote": "git@github.com:zhongweili/42plugin-data.git", + "branch": "master", + "commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390", + "repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data" + }, + "manifest": { + "name": "sdk-skills", + "description": "Skills for using Treasure Data SDKs including JavaScript SDK for browser-based event tracking and pytd (Python SDK) for querying data, importing pandas DataFrames, and building analytical pipelines" + }, + "content": { + "files": [ + { + "path": "README.md", + "sha256": "bd716c3683b3a461922279d012c0f015624c18fe877f142dd2ccd656f685ae5f" + }, + { + "path": ".claude-plugin/plugin.json", + "sha256": "4f326d7357917a928996ba42c317f01a592ef0e42e8a8b3b96b5e5911e4a8c1e" + }, + { + "path": "skills/python/SKILL.md", + "sha256": "f4dd6da826f89bc3d9323d3d6c4b273a8d850f579dc21fdcb837d9a9cbd0ffb7" + }, + { + "path": "skills/javascript/SKILL.md", + "sha256": "150353b2e36ab029654a8eaf692d45b4d41dd062b6f9dd02360ab1f87dcb68f2" + } + ], + "dirSha256": "c548802f4867e35e36431653df8a2b46750d6364492c7713d2be881faba1086b" + }, + "security": { + "scannedAt": null, + "scannerVersion": null, + "flags": [] + } +} \ No newline at end of file diff --git a/skills/javascript/SKILL.md b/skills/javascript/SKILL.md new file mode 100644 index 0000000..bf47201 --- /dev/null +++ b/skills/javascript/SKILL.md @@ -0,0 +1,741 @@ +--- +name: td-javascript-sdk +description: Expert assistance for importing data to Treasure Data using the JavaScript SDK. Use this skill when users need help with browser-based event tracking, page analytics, client-side data collection, or implementing TD's JS SDK for web applications. +--- + +# Treasure Data JavaScript SDK + +Expert assistance for implementing client-side data collection and event tracking with the Treasure Data JavaScript SDK. + +## When to Use This Skill + +Use this skill when: +- Implementing browser-based event tracking for web applications +- Setting up page view analytics and user behavior tracking +- Collecting client-side data (clicks, form submissions, user interactions) +- Integrating TD data collection into JavaScript/frontend applications +- Migrating from other analytics platforms to TD's event tracking +- Troubleshooting JS SDK configuration or data import issues + +## Core Principles + +### 1. Installation Methods + +**Script Loader (Recommended for Most Cases):** +```html + +``` + +**NPM Package (For Bundlers like Webpack/Browserify):** +```bash +npm install --save td-js-sdk +``` + +```javascript +import Treasure from 'td-js-sdk'; +``` + +**Important:** The SDK is **browser-only** and does not work with Node.js. For server-side data import, use the REST API or other SDKs. + +### 2. SDK Initialization + +Initialize the SDK with your database name and write-only API key: + +```javascript +var treasure = new Treasure({ + database: 'your_database_name', + writeKey: 'your_write_only_api_key', + startInSignedMode: false // Start in anonymous mode (default) +}); +``` + +**Configuration Options:** +- `database` (required): TD database name +- `writeKey` (required): Write-only API key from TD console +- `clientId`: Custom UUID for client identification (auto-generated if not provided) +- `startInSignedMode`: `false` for anonymous mode (default), `true` to include PII +- `host`: Custom ingestion endpoint (advanced use cases) +- `development`: Set to `true` to enable console logging for debugging + +**Obtaining API Keys:** +1. Log in to Treasure Data console +2. Navigate to your profile settings +3. Generate a write-only API key +4. Never use master or read-write keys in client-side code + +### 3. Data Import Methods + +#### addRecord() - Custom Event Data + +Send custom data objects to specified tables: + +```javascript +// Basic usage +treasure.addRecord('events', { + event_type: 'purchase', + product_id: 'SKU-12345', + amount: 99.99, + currency: 'USD', + user_id: 'user_abc123' +}); + +// With callback +treasure.addRecord('user_actions', { + action: 'form_submit', + form_id: 'newsletter_signup', + success: true +}, function(response) { + console.log('Data sent successfully', response); +}); +``` + +#### trackPageview() - Page View Tracking + +Track page impressions with automatic context data: + +```javascript +// Track to 'pageviews' table +treasure.trackPageview('pageviews'); + +// Track with custom properties +treasure.trackPageview('pageviews', { + category: 'product_page', + product_id: 'SKU-12345' +}); +``` + +**Automatic Context Data Included:** +- Page URL, title, referrer, host, path +- Screen resolution, viewport dimensions, color depth +- Browser language, user agent, platform +- TD client ID, SDK version +- Timestamp + +#### trackEvent() - Custom Event Tracking + +Track custom events with context: + +```javascript +// Basic event +treasure.trackEvent('button_click', { + button_id: 'cta_signup', + location: 'hero_section' +}); + +// Complex event with nested data +treasure.trackEvent('video_interaction', { + video_id: 'intro_video_v2', + action: 'play', + timestamp_seconds: 45, + playback_rate: 1.0, + quality: '1080p' +}); +``` + +## Common Patterns + +### Pattern 1: E-commerce Tracking + +```javascript +// Initialize SDK +var treasure = new Treasure({ + database: 'ecommerce_analytics', + writeKey: 'your_write_only_key' +}); + +// Set global context (user session data) +treasure.set('$global', { + user_id: getCurrentUserId(), + session_id: getSessionId(), + environment: 'production' +}); + +// Track product views +function trackProductView(product) { + treasure.trackEvent('product_view', { + product_id: product.id, + product_name: product.name, + category: product.category, + price: product.price, + currency: 'USD' + }); +} + +// Track add to cart +function trackAddToCart(product, quantity) { + treasure.trackEvent('add_to_cart', { + product_id: product.id, + quantity: quantity, + price: product.price, + total_value: product.price * quantity + }); +} + +// Track purchase completion +function trackPurchase(order) { + treasure.addRecord('purchases', { + order_id: order.id, + total_amount: order.total, + currency: 'USD', + items: order.items, + payment_method: order.payment_method, + shipping_address: order.shipping.country + }); +} +``` + +**Explanation:** This pattern sets up comprehensive e-commerce tracking with global context shared across all events, ensuring consistent user and session identification. + +### Pattern 2: Form Tracking with Error Handling + +```javascript +// Form submission tracking +document.getElementById('signup-form').addEventListener('submit', function(e) { + e.preventDefault(); + + var formData = { + form_id: 'user_signup', + email: this.email.value, + plan: this.plan.value, + referral_source: document.referrer + }; + + // Send to TD before form submission + treasure.addRecord('form_submissions', formData, function(error, response) { + if (error) { + console.error('TD tracking failed:', error); + // Continue with form submission even if tracking fails + } + + // Proceed with actual form submission + submitForm(formData); + }); +}); + +// Form abandonment tracking +var formStarted = false; +document.querySelectorAll('form input').forEach(function(input) { + input.addEventListener('focus', function() { + if (!formStarted) { + formStarted = true; + treasure.trackEvent('form_started', { + form_id: 'user_signup' + }); + } + }); +}); + +window.addEventListener('beforeunload', function() { + if (formStarted && !formSubmitted) { + treasure.trackEvent('form_abandoned', { + form_id: 'user_signup', + fields_completed: getCompletedFieldCount() + }); + } +}); +``` + +**Explanation:** Tracks form interactions including starts, submissions, and abandonments. Uses callbacks to ensure tracking doesn't block user experience. + +### Pattern 3: User Session Tracking with Privacy Controls + +```javascript +var treasure = new Treasure({ + database: 'user_analytics', + writeKey: 'your_write_only_key', + startInSignedMode: false // Start anonymous +}); + +// Check user consent +function initializeTracking() { + var hasConsent = checkUserConsent(); + + if (hasConsent) { + // User consented, enable full tracking + treasure.setSignedMode(); + treasure.unblockEvents(); + + treasure.set('$global', { + user_id: getUserId(), + consent_given: true, + consent_date: new Date().toISOString() + }); + } else { + // User declined, use anonymous mode + treasure.setAnonymousMode(); + treasure.blockEvents(); // Or collect minimal data + } +} + +// Update when consent changes +function onConsentGranted() { + treasure.setSignedMode(); + treasure.unblockEvents(); + treasure.resetUUID(); // Generate new client ID + + treasure.trackEvent('consent_granted', { + timestamp: new Date().toISOString() + }); +} + +function onConsentRevoked() { + treasure.trackEvent('consent_revoked', { + timestamp: new Date().toISOString() + }); + + treasure.setAnonymousMode(); + treasure.blockEvents(); +} +``` + +**Explanation:** Implements GDPR/privacy-compliant tracking with consent management. Starts in anonymous mode and only enables full tracking after user consent. + +### Pattern 4: Single Page Application (SPA) Tracking + +```javascript +// Initialize once +var treasure = new Treasure({ + database: 'spa_analytics', + writeKey: 'your_write_only_key' +}); + +// Track route changes (example with vanilla JS) +var currentPage = window.location.pathname; + +function trackPageChange() { + var newPage = window.location.pathname; + + if (newPage !== currentPage) { + // Track page view + treasure.trackPageview('pageviews', { + previous_page: currentPage, + navigation_type: 'spa_route_change' + }); + + currentPage = newPage; + } +} + +// Listen for history changes +window.addEventListener('popstate', trackPageChange); + +// Override pushState and replaceState +var pushState = history.pushState; +history.pushState = function() { + pushState.apply(history, arguments); + trackPageChange(); +}; + +var replaceState = history.replaceState; +history.replaceState = function() { + replaceState.apply(history, arguments); + trackPageChange(); +}; + +// Track time on page +var pageStartTime = Date.now(); + +window.addEventListener('beforeunload', function() { + treasure.trackEvent('page_engagement', { + page: window.location.pathname, + time_spent_seconds: Math.round((Date.now() - pageStartTime) / 1000) + }); +}); +``` + +**Explanation:** Handles SPA routing by intercepting navigation events and tracking virtual page views. Includes time-on-page metrics for engagement analysis. + +## Best Practices + +1. **Use Write-Only API Keys** - Never expose master or read-write keys in client-side code. Generate write-only keys specifically for JS SDK use. + +2. **Set Global Defaults** - Use `treasure.set('$global', {...})` for properties that apply to all events (user_id, environment, app_version). + +3. **Table-Level Defaults** - Set common properties per table: `treasure.set('table_name', 'property', 'value')`. + +4. **Asynchronous Loading** - Use the async loader script to avoid blocking page rendering. + +5. **Privacy by Default** - Start in anonymous mode and only enable signed mode after obtaining user consent. + +6. **Validate Data Client-Side** - Check data types and required fields before sending to avoid ingestion errors. + +7. **Use Callbacks for Critical Events** - For important events (purchases, signups), use callbacks to ensure data is sent before navigation. + +8. **Include Context** - Add contextual information (page section, feature version, A/B test variant) to events for richer analysis. + +9. **Batch Related Events** - The SDK handles batching internally, but group related `addRecord` calls together in code for clarity. + +10. **Monitor Console in Development** - Use `development: true` config option during testing to see SDK activity. + +## Common Issues and Solutions + +### Issue: Events Not Appearing in TD + +**Symptoms:** +- Data sent from browser but not visible in TD console +- No errors in browser console + +**Solutions:** +1. **Check API Key Permissions** + - Verify you're using a write-only key + - Ensure key has write access to the specified database + - Check key hasn't been revoked + +2. **Verify Database Name** + ```javascript + // Incorrect: using underscores or special characters incorrectly + database: 'my-database' // May fail + + // Correct: use valid database names + database: 'my_database' // Works + ``` + +3. **Check Browser Network Tab** + - Look for requests to `in.treasuredata.com` + - Verify 200 OK responses + - Check for CORS errors (rare, but possible with custom configurations) + +4. **Data Delay** + - Browser SDK uses streaming ingestion + - Data may take 1-5 minutes to appear in TD console + - For v4.0+ SDK, data should appear within ~1 minute + +### Issue: CORS Errors + +**Symptoms:** +- Browser console shows CORS policy errors +- Requests to TD endpoints blocked + +**Solutions:** +1. Ensure using official TD CDN URL for SDK +2. Check custom `host` configuration if set +3. Verify SSL/HTTPS configuration matches your site + +**Example Fix:** +```javascript +// Don't customize host unless necessary +var treasure = new Treasure({ + database: 'your_database', + writeKey: 'your_key' + // Remove custom 'host' setting +}); +``` + +### Issue: Data Not Matching Expected Schema + +**Symptoms:** +- Fields have wrong data types in TD +- Nested objects not properly stored + +**Solutions:** +1. **Flatten Complex Objects** + ```javascript + // Problematic: deeply nested + treasure.addRecord('events', { + user: { + profile: { + name: 'John', + age: 30 + } + } + }); + + // Better: flattened structure + treasure.addRecord('events', { + user_name: 'John', + user_age: 30 + }); + ``` + +2. **Consistent Data Types** + ```javascript + // Ensure consistent types across events + treasure.addRecord('events', { + user_id: String(userId), // Always string + amount: parseFloat(amount), // Always number + timestamp: new Date().toISOString() // Always ISO string + }); + ``` + +3. **Handle Null/Undefined Values** + ```javascript + function sendEvent(data) { + // Remove undefined/null values + var cleanData = Object.keys(data).reduce(function(acc, key) { + if (data[key] != null) { + acc[key] = data[key]; + } + return acc; + }, {}); + + treasure.addRecord('events', cleanData); + } + ``` + +### Issue: SDK Not Loading + +**Symptoms:** +- `Treasure is not defined` errors +- SDK script fails to load + +**Solutions:** +1. **Check Script Placement** + ```html + + + + + ``` + +2. **Verify CDN Availability** + - Check network connectivity + - Verify CDN URL is correct: `https://cdn.treasuredata.com/sdk/2.5/td.min.js` + - Check for ad blockers or privacy extensions blocking the script + +3. **Use Ready Callback** + ```javascript + var treasure = new Treasure({ + database: 'your_database', + writeKey: 'your_key' + }); + + treasure.ready(function() { + // SDK fully loaded, safe to track + treasure.trackPageview('pageviews'); + }); + ``` + +## Advanced Topics + +### Custom Client ID Management + +For cross-device tracking or specific user identification: + +```javascript +var treasure = new Treasure({ + database: 'your_database', + writeKey: 'your_key', + clientId: getUserIdFromYourSystem() // Use your own UUID +}); + +// Reset UUID when user logs out +function onUserLogout() { + treasure.resetUUID(); + treasure.setAnonymousMode(); +} +``` + +### Server-Side Cookie Integration + +Fetch server-side TD cookies for unified tracking: + +```javascript +treasure.fetchServerCookie(function(error, cookie) { + if (!error && cookie) { + console.log('Server cookie:', cookie); + // Use cookie data for unified tracking + } +}); +``` + +### Global ID and User Segments + +Fetch TD Global ID and user segments for personalization: + +```javascript +// Fetch Global ID +treasure.fetchGlobalID(function(error, globalId) { + if (!error) { + console.log('TD Global ID:', globalId); + } +}); + +// Fetch user segments (requires audience configuration) +treasure.fetchUserSegments({ + audienceToken: 'your_audience_token', + keys: { + td_global_id: 'global_id_value' + } +}, function(error, segments) { + if (!error) { + console.log('User segments:', segments); + // Use for personalization + } +}); +``` + +### Automatic Click Tracking + +Enable automatic tracking of all link clicks: + +```javascript +treasure.trackClicks({ + element: document.body, // Track clicks within body + tableName: 'clicks', // Target table + attributes: { // Custom attributes to include + page: window.location.pathname + } +}); +``` + +## Testing and Debugging + +### Development Mode + +Enable console logging during development: + +```javascript +var treasure = new Treasure({ + database: 'your_database', + writeKey: 'your_key', + development: true // Enables console logging +}); +``` + +### Manual Testing Checklist + +1. **Verify SDK Initialization** + ```javascript + console.log('Treasure SDK loaded:', typeof Treasure !== 'undefined'); + console.log('Treasure instance:', treasure); + ``` + +2. **Test Event Sending** + ```javascript + treasure.addRecord('test_events', { + test_field: 'test_value', + timestamp: new Date().toISOString() + }, function(error, response) { + console.log('Error:', error); + console.log('Response:', response); + }); + ``` + +3. **Check Network Traffic** + - Open browser DevTools > Network tab + - Filter by `treasuredata.com` + - Verify POST requests return 200 OK + - Inspect request payload + +4. **Verify in TD Console** + - Wait 1-5 minutes for data to appear + - Query your table: `SELECT * FROM your_database.test_events ORDER BY time DESC LIMIT 10` + +### Common Testing Patterns + +```javascript +// Create test helper +function testTreasureSDK() { + console.group('TD SDK Test'); + + // Test 1: SDK loaded + console.log('1. SDK loaded:', typeof Treasure !== 'undefined'); + + // Test 2: Instance created + console.log('2. Instance:', treasure); + + // Test 3: Send test event + treasure.addRecord('sdk_tests', { + test_name: 'connection_test', + timestamp: new Date().toISOString(), + browser: navigator.userAgent + }, function(error, response) { + console.log('3. Test event error:', error); + console.log('3. Test event response:', response); + }); + + console.groupEnd(); +} + +// Run tests +testTreasureSDK(); +``` + +## Migration from Other Analytics Platforms + +### From Google Analytics + +```javascript +// GA pageview +ga('send', 'pageview'); + +// TD equivalent +treasure.trackPageview('pageviews'); + +// GA event +ga('send', 'event', 'category', 'action', 'label', value); + +// TD equivalent +treasure.trackEvent('ga_events', { + event_category: 'category', + event_action: 'action', + event_label: 'label', + event_value: value +}); + +// GA user ID +ga('set', 'userId', 'USER_12345'); + +// TD equivalent +treasure.set('$global', { user_id: 'USER_12345' }); +``` + +### From Mixpanel + +```javascript +// Mixpanel track +mixpanel.track('Event Name', { property: 'value' }); + +// TD equivalent +treasure.trackEvent('Event Name', { property: 'value' }); + +// Mixpanel identify +mixpanel.identify('USER_12345'); + +// TD equivalent +treasure.set('$global', { user_id: 'USER_12345' }); + +// Mixpanel people.set +mixpanel.people.set({ $email: 'user@example.com' }); + +// TD equivalent (separate table for user properties) +treasure.addRecord('user_properties', { + user_id: 'USER_12345', + email: 'user@example.com' +}); +``` + +## SDK Version and Updates + +**Current Recommended Version:** 2.5.x + +**Version 4.0+ Important Note:** +If using SDK version 4.0 or higher, configuration changes are required for the new streaming ingestion endpoint. Consult the official migration documentation. + +**Version Check:** +```javascript +console.log('TD SDK Version:', Treasure.version); +``` + +## Resources + +- **Official Documentation:** https://api-docs.treasuredata.com/en/sdk/js-sdk/ +- **GitHub Repository:** https://github.com/treasure-data/td-js-sdk +- **TD Console:** https://console.treasuredata.com/ +- **API Keys:** Profile > API Keys in TD Console +- **Support:** https://support.treasuredata.com/ + +## Related Skills + +- **trino**: Query and analyze data collected via JS SDK using Trino SQL +- **hive**: Query and analyze data using Hive SQL +- **digdag**: Create workflows to process JS SDK event data +- **dbt**: Transform and model JS SDK event data using dbt + +--- + +*Last updated: 2025-01 | SDK Version: 2.5.x* diff --git a/skills/python/SKILL.md b/skills/python/SKILL.md new file mode 100644 index 0000000..3556194 --- /dev/null +++ b/skills/python/SKILL.md @@ -0,0 +1,976 @@ +--- +name: pytd +description: Expert assistance for using pytd (Python SDK) to query and import data with Treasure Data. Use this skill when users need help with Python-based data analysis, querying Presto/Hive, importing pandas DataFrames, bulk data uploads, or integrating TD with Python analytical workflows. +--- + +# pytd - Treasure Data Python SDK + +Expert assistance for querying and importing data to Treasure Data using pytd, the official Python driver for analytical workflows. + +## When to Use This Skill + +Use this skill when: +- Querying Treasure Data from Python scripts or Jupyter notebooks +- Importing pandas DataFrames to TD tables +- Running Presto or Hive queries from Python +- Building data pipelines with Python and TD +- Performing bulk data imports or exports +- Migrating from deprecated pandas-td library +- Integrating TD with Python data science workflows +- Handling large result sets with iterative retrieval + +## Core Principles + +### 1. Installation + +**Standard Installation:** +```bash +pip install pytd +``` + +**Requirements:** +- Python 3.9 or later +- pandas 2.0 or later + +### 2. Authentication & Configuration + +**Environment Variables (Recommended):** +```bash +export TD_API_KEY="your_api_key_here" +export TD_API_SERVER="https://api.treasuredata.com/" +``` + +**Client Initialization:** +```python +import pytd + +# Using environment variables +client = pytd.Client(database='sample_datasets') + +# Explicit credentials (not recommended for production) +client = pytd.Client( + apikey='your_api_key', + endpoint='https://api.treasuredata.com/', + database='your_database', + default_engine='presto' # or 'hive' +) +``` + +**Configuration Options:** +- `apikey`: TD API key (read from `TD_API_KEY` env var if not specified) +- `endpoint`: TD API server URL (read from `TD_API_SERVER` env var) +- `database`: Default database name for queries +- `default_engine`: Query engine - `'presto'` (default) or `'hive'` + +**Regional Endpoints:** +- US: `https://api.treasuredata.com/` +- Tokyo: `https://api.treasuredata.co.jp/` +- EU: `https://api.eu01.treasuredata.com/` + +### 3. Querying Data + +#### Basic Query Execution + +```python +import pytd + +client = pytd.Client(database='sample_datasets') + +# Execute Presto query +result = client.query('SELECT symbol, COUNT(1) as cnt FROM nasdaq GROUP BY symbol LIMIT 10') + +# Result format: {'columns': ['symbol', 'cnt'], 'data': [['AAIT', 590], ['AAL', 82], ...]} +print(result['columns']) # ['symbol', 'cnt'] +print(result['data']) # [['AAIT', 590], ['AAL', 82], ...] +``` + +#### Query with Hive Engine + +```python +# Create Hive client +client_hive = pytd.Client( + database='sample_datasets', + default_engine='hive' +) + +# Execute Hive query +result = client_hive.query('SELECT hivemall_version()') +``` + +#### Convert Results to pandas DataFrame + +```python +import pandas as pd + +result = client.query('SELECT * FROM nasdaq LIMIT 100') + +# Convert to DataFrame +df = pd.DataFrame(result['data'], columns=result['columns']) +print(df.head()) +``` + +### 4. Writing Data to TD + +#### Load DataFrame to Table + +```python +import pandas as pd +import pytd + +# Create sample DataFrame +df = pd.DataFrame({ + 'user_id': [1, 2, 3, 4], + 'event_name': ['login', 'purchase', 'logout', 'login'], + 'amount': [None, 99.99, None, None], + 'timestamp': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-02', '2024-01-03']) +}) + +# Initialize client +client = pytd.Client(database='your_database') + +# Upload DataFrame +client.load_table_from_dataframe( + df, + 'events', # table name + writer='bulk_import', # writer type + if_exists='overwrite' # or 'append', 'error', 'ignore' +) +``` + +**Parameters:** +- `df`: pandas DataFrame to upload +- `table`: Target table name (can be `'database.table'` or just `'table'`) +- `writer`: Import method - `'bulk_import'` (default), `'insert_into'`, or `'spark'` +- `if_exists`: What to do if table exists - `'error'` (default), `'overwrite'`, `'append'`, or `'ignore'` + +## Common Patterns + +### Pattern 1: ETL Pipeline - Query, Transform, Load + +```python +import pytd +import pandas as pd + +# Initialize client +client = pytd.Client(database='analytics') + +# Step 1: Extract - Query data from TD +query = """ + SELECT + user_id, + event_name, + event_date, + COUNT(*) as event_count + FROM raw_events + WHERE TD_INTERVAL(time, '-1d', 'JST') + GROUP BY user_id, event_name, event_date +""" + +result = client.query(query) +df = pd.DataFrame(result['data'], columns=result['columns']) + +# Step 2: Transform - Process data with pandas +df['event_date'] = pd.to_datetime(df['event_date']) +df['is_weekend'] = df['event_date'].dt.dayofweek >= 5 +df['event_count_log'] = df['event_count'].apply(lambda x: pd.np.log1p(x)) + +# Add metadata +df['processed_at'] = pd.Timestamp.now() +df['pipeline_version'] = '1.0' + +# Step 3: Load - Write back to TD +client.load_table_from_dataframe( + df, + 'analytics.user_daily_events', + writer='bulk_import', + if_exists='append' +) + +print(f"Loaded {len(df)} rows to user_daily_events") +``` + +**Explanation:** Complete ETL workflow that extracts yesterday's data, performs pandas transformations, and loads results back to TD. Uses bulk_import for efficient loading. + +### Pattern 2: Incremental Data Loading + +```python +import pytd +import pandas as pd +from datetime import datetime, timedelta + +client = pytd.Client(database='sales') + +def load_incremental_data(source_file, table_name, date_column='import_date'): + """Load new data incrementally, avoiding duplicates""" + + # Read new data from source + new_data = pd.read_csv(source_file) + new_data[date_column] = datetime.now() + + # Get max date from existing table + try: + result = client.query(f""" + SELECT MAX({date_column}) as max_date + FROM {table_name} + """) + + max_date = result['data'][0][0] if result['data'][0][0] else None + + if max_date: + # Filter only new records + new_data = new_data[new_data[date_column] > max_date] + print(f"Loading {len(new_data)} new records after {max_date}") + else: + print(f"Table empty, loading all {len(new_data)} records") + + except Exception as e: + # Table doesn't exist yet + print(f"Creating new table with {len(new_data)} records") + + if len(new_data) > 0: + client.load_table_from_dataframe( + new_data, + table_name, + writer='bulk_import', + if_exists='append' + ) + print("Load complete") + else: + print("No new data to load") + +# Usage +load_incremental_data('daily_sales.csv', 'sales.transactions') +``` + +**Explanation:** Implements incremental loading by checking the latest timestamp in the target table and only loading newer records. Handles first-time loads gracefully. + +### Pattern 3: Large Result Set Processing with DB-API + +```python +import pytd +from pytd.dbapi import connect + +client = pytd.Client(database='large_dataset') + +# Create DB-API connection +conn = connect(client) +cursor = conn.cursor() + +# Execute query that might timeout with standard query() +cursor.execute(""" + SELECT user_id, event_name, event_time, properties + FROM events + WHERE TD_INTERVAL(time, '-7d', 'JST') +""") + +# Process results iteratively (memory efficient) +batch_size = 10000 +processed_count = 0 + +while True: + rows = cursor.fetchmany(batch_size) + if not rows: + break + + # Process batch + for row in rows: + user_id, event_name, event_time, properties = row + # Process each row + process_event(user_id, event_name, event_time, properties) + + processed_count += len(rows) + print(f"Processed {processed_count} rows...") + +cursor.close() +conn.close() + +print(f"Total processed: {processed_count} rows") +``` + +**Explanation:** Uses DB-API for iterative retrieval of large result sets. Prevents memory issues and query timeouts by fetching data in batches. Essential for processing millions of rows. + +### Pattern 4: Multi-Database Operations + +```python +import pytd +import pandas as pd + +# Connect to different databases +source_client = pytd.Client(database='raw_data') +target_client = pytd.Client(database='analytics') + +# Query from source database +query = """ + SELECT + customer_id, + product_id, + purchase_date, + amount + FROM purchases + WHERE TD_INTERVAL(time, '-1d', 'JST') +""" + +result = source_client.query(query) +df = pd.DataFrame(result['data'], columns=result['columns']) + +# Enrich data by querying another source +product_query = "SELECT product_id, product_name, category FROM products" +products_result = source_client.query(product_query) +products_df = pd.DataFrame(products_result['data'], columns=products_result['columns']) + +# Join data +enriched_df = df.merge(products_df, on='product_id', how='left') + +# Calculate metrics +daily_summary = enriched_df.groupby(['category', 'purchase_date']).agg({ + 'amount': ['sum', 'mean', 'count'], + 'customer_id': 'nunique' +}).reset_index() + +daily_summary.columns = ['category', 'date', 'total_sales', 'avg_sale', 'transaction_count', 'unique_customers'] + +# Write to analytics database +target_client.load_table_from_dataframe( + daily_summary, + 'daily_category_sales', + writer='bulk_import', + if_exists='append' +) + +print(f"Loaded {len(daily_summary)} rows to analytics.daily_category_sales") +``` + +**Explanation:** Demonstrates working with multiple databases, joining data, performing aggregations, and writing to a different target database. + +### Pattern 5: Handling Time-based Data with TD Functions + +```python +import pytd +import pandas as pd +from datetime import datetime + +client = pytd.Client(database='events') + +# Query with TD time functions +query = """ + SELECT + TD_TIME_FORMAT(time, 'yyyy-MM-dd', 'JST') as date_jst, + COUNT(*) as event_count, + COUNT(DISTINCT user_id) as unique_users, + APPROX_PERCENTILE(session_duration, 0.5) as median_duration, + APPROX_PERCENTILE(session_duration, 0.95) as p95_duration + FROM user_sessions + WHERE TD_INTERVAL(time, '-7d', 'JST') + GROUP BY 1 + ORDER BY 1 DESC +""" + +result = client.query(query) +df = pd.DataFrame(result['data'], columns=result['columns']) + +# Convert date strings to datetime +df['date_jst'] = pd.to_datetime(df['date_jst']) + +# Add derived metrics +df['events_per_user'] = df['event_count'] / df['unique_users'] + +# Write summary back +client.load_table_from_dataframe( + df, + 'weekly_session_summary', + writer='bulk_import', + if_exists='overwrite' +) +``` + +**Explanation:** Shows proper use of TD time functions (TD_INTERVAL, TD_TIME_FORMAT) in queries and how to handle the results in pandas. + +## Writer Types Comparison + +pytd supports three writer methods for loading data: + +### 1. bulk_import (Default - Recommended) + +**Best for:** Most use cases, especially large datasets + +```python +client.load_table_from_dataframe( + df, + 'table_name', + writer='bulk_import', + if_exists='append' +) +``` + +**Characteristics:** +- ✓ Scalable to large datasets +- ✓ Memory efficient (streams data) +- ✓ No special permissions required +- ✓ Best balance of performance and simplicity +- ✗ Slower than Spark for very large datasets +- Uses CSV format internally + +**When to use:** Default choice for most data loads (100s of MB to GBs) + +### 2. insert_into + +**Best for:** Small datasets, real-time updates + +```python +client.load_table_from_dataframe( + df, + 'table_name', + writer='insert_into', + if_exists='append' +) +``` + +**Characteristics:** +- ✓ Simple, no dependencies +- ✓ Good for small datasets (<1000 rows) +- ✗ Not scalable (issues individual INSERT queries) +- ✗ Slow for large datasets +- ✗ Uses Presto query capacity +- Uses Presto INSERT INTO statements + +**When to use:** Only for small datasets or when you need immediate writes without bulk import delay + +### 3. spark (High Performance) + +**Best for:** Very large datasets, high-performance pipelines + +```python +from pytd.writer import SparkWriter + +writer = SparkWriter( + td_spark_path='/path/to/td-spark-assembly.jar' # Optional +) + +client.load_table_from_dataframe( + df, + 'table_name', + writer=writer, + if_exists='append' +) +``` + +**Characteristics:** +- ✓ Highest performance +- ✓ Direct writes to Plazma storage +- ✓ Best for very large datasets (10s of GBs+) +- ✗ Requires `pytd[spark]` installation +- ✗ Requires Plazma Public API access (contact support) +- ✗ Additional dependencies + +**When to use:** Large-scale data pipelines requiring maximum throughput + +**Enabling Spark Writer:** +1. Install: `pip install pytd[spark]` +2. Contact `support@treasuredata.com` to enable Plazma Public API access +3. (Optional) Download td-spark JAR for custom versions + +## Best Practices + +1. **Use Environment Variables for Credentials** + ```bash + export TD_API_KEY="your_api_key" + export TD_API_SERVER="https://api.treasuredata.com/" + ``` + Never hardcode API keys in scripts + +2. **Choose the Right Writer** + - `bulk_import`: Default choice for most scenarios + - `insert_into`: Only for small datasets (<1000 rows) + - `spark`: For very large datasets with proper setup + +3. **Use TD Time Functions in Queries** + ```python + # Good: Uses partition pruning + query = "SELECT * FROM table WHERE TD_INTERVAL(time, '-1d', 'JST')" + + # Avoid: Scans entire table + query = "SELECT * FROM table WHERE date = '2024-01-01'" + ``` + +4. **Handle Large Results with DB-API** + Use `pytd.dbapi` for queries returning millions of rows to avoid memory issues + +5. **Specify Database in Table Name** + ```python + # Explicit database (recommended) + client.load_table_from_dataframe(df, 'database.table') + + # Uses client's default database + client.load_table_from_dataframe(df, 'table') + ``` + +6. **Add Time Column for Partitioning** + ```python + df['time'] = pd.to_datetime(df['timestamp']).astype(int) // 10**9 + client.load_table_from_dataframe(df, 'table') + ``` + +7. **Use Presto for Analytics, Hive for Special Functions** + - Presto: Faster for most analytical queries + - Hive: Required for Hivemall, UDFs, some advanced features + +8. **Batch Processing for Large ETL** + Process data in chunks to avoid memory issues: + ```python + for chunk in pd.read_csv('large_file.csv', chunksize=100000): + # Process chunk + client.load_table_from_dataframe(chunk, 'table', if_exists='append') + ``` + +9. **Error Handling** + ```python + try: + result = client.query(query) + except Exception as e: + print(f"Query failed: {e}") + # Handle error appropriately + ``` + +10. **Close Connections in Long-Running Scripts** + ```python + from pytd.dbapi import connect + + conn = connect(client) + try: + # Use connection + cursor = conn.cursor() + cursor.execute(query) + # Process results + finally: + conn.close() + ``` + +## Common Issues and Solutions + +### Issue: Import Errors or Module Not Found + +**Symptoms:** +- `ModuleNotFoundError: No module named 'pytd'` +- `ImportError: cannot import name 'SparkWriter'` + +**Solutions:** +1. **Verify Installation** + ```bash + pip list | grep pytd + ``` + +2. **Install/Upgrade pytd** + ```bash + pip install --upgrade pytd + ``` + +3. **For Spark Support** + ```bash + pip install pytd[spark] + ``` + +4. **Check Python Version** + ```bash + python --version # Should be 3.9+ + ``` + +### Issue: Authentication Errors + +**Symptoms:** +- `Unauthorized: Invalid API key` +- `403 Forbidden` + +**Solutions:** +1. **Verify Environment Variables** + ```bash + echo $TD_API_KEY + echo $TD_API_SERVER + ``` + +2. **Check API Key Format** + ```python + # Verify API key is set correctly + import os + print(os.getenv('TD_API_KEY')) + ``` + +3. **Verify Regional Endpoint** + ```python + # US + endpoint = 'https://api.treasuredata.com/' + # Tokyo + endpoint = 'https://api.treasuredata.co.jp/' + # EU + endpoint = 'https://api.eu01.treasuredata.com/' + ``` + +4. **Check API Key Permissions** + - Ensure key has appropriate read/write permissions + - Regenerate key if necessary from TD console + +### Issue: Query Timeout or Memory Errors + +**Symptoms:** +- Query times out after several minutes +- `MemoryError` when fetching large results +- Connection drops during query execution + +**Solutions:** +1. **Use DB-API for Large Results** + ```python + from pytd.dbapi import connect + + conn = connect(client) + cursor = conn.cursor() + cursor.execute(query) + + # Fetch in batches + for row in cursor.fetchmany(10000): + process(row) + ``` + +2. **Add Time Filters for Partition Pruning** + ```python + query = """ + SELECT * FROM large_table + WHERE TD_INTERVAL(time, '-1d', 'JST') -- Add this! + """ + ``` + +3. **Limit Result Size** + ```python + query = "SELECT * FROM table WHERE ... LIMIT 100000" + ``` + +4. **Use Aggregations Instead of Raw Data** + ```python + # Instead of fetching all rows + query = "SELECT * FROM table" + + # Aggregate first + query = """ + SELECT date, user_id, COUNT(*) as cnt + FROM table + GROUP BY 1, 2 + """ + ``` + +### Issue: DataFrame Upload Fails + +**Symptoms:** +- `ValueError: DataFrame is empty` +- Type errors during upload +- Data corruption in uploaded table + +**Solutions:** +1. **Check DataFrame is Not Empty** + ```python + if df.empty: + print("DataFrame is empty, skipping upload") + else: + client.load_table_from_dataframe(df, 'table') + ``` + +2. **Handle Data Types Properly** + ```python + # Convert timestamps to Unix epoch + df['time'] = pd.to_datetime(df['timestamp']).astype(int) // 10**9 + + # Handle NaN values + df['amount'] = df['amount'].fillna(0) + + # Convert to appropriate types + df['user_id'] = df['user_id'].astype(str) + df['count'] = df['count'].astype(int) + ``` + +3. **Check Column Names** + ```python + # TD column names should be lowercase and use underscores + df.columns = df.columns.str.lower().str.replace(' ', '_') + ``` + +4. **Remove Invalid Characters** + ```python + # Remove or replace problematic characters + df = df.applymap(lambda x: str(x).replace('\x00', '') if isinstance(x, str) else x) + ``` + +5. **Try Different Writer** + ```python + # If bulk_import fails, try insert_into for debugging + client.load_table_from_dataframe( + df.head(10), # Test with small sample + 'table', + writer='insert_into' + ) + ``` + +### Issue: Spark Writer Not Working + +**Symptoms:** +- `ImportError: Spark writer not available` +- Spark job fails +- Permission denied errors + +**Solutions:** +1. **Install Spark Dependencies** + ```bash + pip install pytd[spark] + ``` + +2. **Enable Plazma Public API** + - Contact `support@treasuredata.com` + - Request Plazma Public API access for your account + +3. **Specify JAR Path (if needed)** + ```python + from pytd.writer import SparkWriter + + writer = SparkWriter( + td_spark_path='/path/to/td-spark-assembly.jar' + ) + ``` + +4. **Check Permissions** + - Ensure API key has write access to target database + - Verify Plazma access is enabled + +## Advanced Topics + +### Custom Query Options + +```python +# Query with custom parameters +result = client.query( + 'SELECT * FROM table', + engine='presto', + priority=1, # Higher priority (1-2, default 0) + retry_limit=3 +) +``` + +### Working with Job Status + +```python +# Start query asynchronously +job = client.query('SELECT COUNT(*) FROM large_table', wait=False) + +# Check job status +print(f"Job ID: {job.job_id}") +print(f"Status: {job.status()}") + +# Wait for completion +job.wait() + +# Get results +if job.success(): + result = job.result() +else: + print(f"Job failed: {job.error()}") +``` + +### Custom Writers + +```python +from pytd.writer import BulkImportWriter + +# Configure writer with custom options +writer = BulkImportWriter( + chunk_size=10000, # Rows per chunk + time_column='time' # Specify time column +) + +client.load_table_from_dataframe( + df, + 'table', + writer=writer, + if_exists='append' +) +``` + +### Migrating from pandas-td + +If you have existing code using the deprecated `pandas-td` library: + +**Before (pandas-td):** +```python +import pandas_td as td + +con = td.connect(apikey='your_api_key', endpoint='https://api.treasuredata.com/') +df = td.read_td('SELECT * FROM sample_datasets.nasdaq', con) +``` + +**After (pytd):** +```python +import pytd.pandas_td as td + +con = td.connect(apikey='your_api_key', endpoint='https://api.treasuredata.com/') +df = td.read_td('SELECT * FROM sample_datasets.nasdaq', con) +``` + +Or use the modern pytd API: +```python +import pytd +import pandas as pd + +client = pytd.Client(database='sample_datasets') +result = client.query('SELECT * FROM nasdaq') +df = pd.DataFrame(result['data'], columns=result['columns']) +``` + +## Testing and Development + +### Test Connection + +```python +import pytd + +try: + client = pytd.Client(database='sample_datasets') + result = client.query('SELECT 1 as test') + print("Connection successful!") + print(result) +except Exception as e: + print(f"Connection failed: {e}") +``` + +### Verify Data Upload + +```python +import pandas as pd +import pytd + +# Create test data +test_df = pd.DataFrame({ + 'id': [1, 2, 3], + 'name': ['Alice', 'Bob', 'Charlie'], + 'value': [100, 200, 300], + 'time': [1704067200, 1704153600, 1704240000] # Unix timestamps +}) + +client = pytd.Client(database='test_db') + +# Upload +print("Uploading test data...") +client.load_table_from_dataframe( + test_df, + 'test_table', + writer='bulk_import', + if_exists='overwrite' +) + +# Verify +print("Verifying upload...") +result = client.query('SELECT * FROM test_table ORDER BY id') +verify_df = pd.DataFrame(result['data'], columns=result['columns']) + +print("\nUploaded data:") +print(verify_df) + +# Check counts match +assert len(test_df) == len(verify_df), "Row count mismatch!" +print("\nVerification successful!") +``` + +### Performance Testing + +```python +import pytd +import pandas as pd +import time + +client = pytd.Client(database='test_db') + +# Generate test data +df = pd.DataFrame({ + 'id': range(100000), + 'value': range(100000), + 'time': int(time.time()) +}) + +# Test bulk_import +start = time.time() +client.load_table_from_dataframe(df, 'perf_test_bulk', writer='bulk_import', if_exists='overwrite') +bulk_time = time.time() - start +print(f"bulk_import: {bulk_time:.2f}s for {len(df)} rows") + +# Test insert_into (small sample only!) +small_df = df.head(100) +start = time.time() +client.load_table_from_dataframe(small_df, 'perf_test_insert', writer='insert_into', if_exists='overwrite') +insert_time = time.time() - start +print(f"insert_into: {insert_time:.2f}s for {len(small_df)} rows") +``` + +## Jupyter Notebook Integration + +pytd works seamlessly with Jupyter notebooks: + +```python +# Notebook cell 1: Setup +import pytd +import pandas as pd +import matplotlib.pyplot as plt + +client = pytd.Client(database='analytics') + +# Notebook cell 2: Query data +query = """ + SELECT + TD_TIME_FORMAT(time, 'yyyy-MM-dd', 'JST') as date, + COUNT(*) as events + FROM user_events + WHERE TD_INTERVAL(time, '-30d', 'JST') + GROUP BY 1 + ORDER BY 1 +""" + +result = client.query(query) +df = pd.DataFrame(result['data'], columns=result['columns']) +df['date'] = pd.to_datetime(df['date']) + +# Notebook cell 3: Visualize +plt.figure(figsize=(12, 6)) +plt.plot(df['date'], df['events']) +plt.title('Daily Events - Last 30 Days') +plt.xlabel('Date') +plt.ylabel('Event Count') +plt.xticks(rotation=45) +plt.tight_layout() +plt.show() + +# Notebook cell 4: Write results back +summary = df.describe() +# Process and save summary back to TD if needed +``` + +## Resources + +- **Documentation**: https://pytd-doc.readthedocs.io/ +- **GitHub Repository**: https://github.com/treasure-data/pytd +- **PyPI Package**: https://pypi.org/project/pytd/ +- **TD Python Guide**: https://docs.treasuredata.com/ +- **Example Notebooks**: See GitHub repository for Google Colab examples + +## Related Skills + +- **trino**: Understanding Trino SQL syntax for queries in pytd +- **hive**: Using Hive-specific functions and syntax +- **digdag**: Orchestrating Python scripts using pytd in workflows +- **td-javascript-sdk**: Browser-based data collection (frontend) vs pytd (backend/analytics) + +## Comparison with Other Tools + +| Tool | Purpose | When to Use | +|------|---------|-------------| +| **pytd** | Full-featured Python driver | Analytics, data pipelines, pandas integration | +| **td-client-python** | Basic REST API wrapper | Simple CRUD, when pytd is too heavy | +| **pandas-td** (deprecated) | Legacy pandas integration | Don't use - migrate to pytd | +| **TD Toolbelt** | CLI tool | Command-line operations, shell scripts | + +**Recommendation:** Use pytd for all Python-based analytical work and ETL pipelines. Use td-client-python only for basic REST API operations. + +--- + +*Last updated: 2025-01 | pytd version: Latest (Python 3.9+)*