diff --git a/scenarios/import_td_logs/README.md b/scenarios/import_td_logs/README.md new file mode 100644 index 00000000..8002fdd4 --- /dev/null +++ b/scenarios/import_td_logs/README.md @@ -0,0 +1,38 @@ +# Workflow: Import Treasure Data Logs from Data Landing Area +This example shows how you can use workflow to ingest Treasure Data Logs From Data Landing Areas to your Treasure Data account. +This is Opt-in feature. Please contact your Customer Success rep or Technical Support if you have an interest in this feature. + +# How to Run +## Requirement +The workflow requires that Data Landing Areas feature is enabled in your Treasure Data account and then you've got your User ID to access to it. + +## Steps +First, edit configurations. You can find the following settings in the `import_td_logs.dig` file. + +| Parameter | Description | +| ---- | ---- | +| api_endpoint | The endpoint of the Treasure Data API. See this [document](https://docs.treasuredata.com/display/public/PD/Sites+and+Endpoints). (e.g. https://api.treasuredata.com) | +| dla_host | The hostname of the Data Landing Area (e.g. dla1.treasuredata-co.jp) | +| user_id | Your user_id received from TD when you enabled Data Landing Areas feature | +| site | The site of your account (e.g. aws, aws-tokyo, eu01, ap02) | +| account_id | Your TD account_id | +| query_logs_table | The table name where query logs are stored (e.g. query_logs) | +| workflow_logs_table | The table name where workflow logs are stored (e.g. workflow_logs) | +| users_table | The table name where users data are stored (e.g. users) | + +Next, upload the workflow to Treasure Data. + + # Upload + $ td wf push import_td_logs + +Set secrets with your private key that is the rest of public key you gave to TD when you enabled Data Landing Areas feature. + + $ td wf secrets --project import_td_logs --set sftp.dla_secret_key_file=@~/.ssh/id_rsa_dla + $ td wf secrets --project import_td_logs --set td.apikey + +You can trigger the session manually to watch it execute. + + # Run + $ td wf start import_td_logs import_td_logs --session now + +If you have any questions, contact to support@treasuredata.com. diff --git a/scenarios/import_td_logs/config/query_log.yml b/scenarios/import_td_logs/config/query_log.yml new file mode 100644 index 00000000..ad6d2b19 --- /dev/null +++ b/scenarios/import_td_logs/config/query_log.yml @@ -0,0 +1,43 @@ +in: + type: sftp + host: ${dla_host} + user: ${user_id} + secret_key_file: {"content": "${secret:sftp.dla_secret_key_file}"} + path_prefix: "/treasure-data-logs/${site}/${account_id}/query_logs/v1/data.csv" + parser: + charset: UTF-8 + newline: CRLF + type: csv + delimiter: "," + quote: "\"" + escape: "\"" + trim_if_not_quoted: false + skip_header_lines: 1 + allow_extra_columns: false + allow_optional_columns: false + columns: + - {name: date, type: string} + - {name: account_id, type: string} + - {name: user_id, type: string} + - {name: job_id, type: long} + - {name: created_at, type: string} + - {name: scheduled_at, type: string} + - {name: start_at, type: string} + - {name: end_at, type: string} + - {name: queued_sec, type: long} + - {name: running_sec, type: long} + - {name: result_type, type: string} + - {name: load_type, type: string} + - {name: records, type: long} + - {name: type, type: string} + - {name: query_status, type: string} + - {name: result_size, type: long} + - {name: query_id, type: string} + - {name: split_hours, type: double} + - {name: average_hive_cores, type: double} + - {name: project_name, type: string} + - {name: workflow_name, type: string} + - {name: task_id, type: string} + - {name: time, type: long} +out: + mode: replace \ No newline at end of file diff --git a/scenarios/import_td_logs/config/workflow_log.yml b/scenarios/import_td_logs/config/workflow_log.yml new file mode 100644 index 00000000..67df0d9b --- /dev/null +++ b/scenarios/import_td_logs/config/workflow_log.yml @@ -0,0 +1,41 @@ +in: + type: sftp + host: ${dla_host} + user: ${user_id} + secret_key_file: {"content": "${secret:sftp.dla_secret_key_file}"} + path_prefix: "/treasure-data-logs/${site}/${account_id}/workflow_logs/v1/data.csv" + parser: + charset: UTF-8 + newline: CRLF + type: csv + delimiter: "," + quote: "\"" + escape: "\"" + trim_if_not_quoted: false + skip_header_lines: 1 + allow_extra_columns: false + allow_optional_columns: false + columns: + - {name: account_id, type: string} + - {name: project_id, type: string} + - {name: workflow_id, type: string} + - {name: session_id, type: string} + - {name: attempt_id, type: string} + - {name: task_id, type: string} + - {name: user_id, type: string} + - {name: project_name, type: string} + - {name: workflow_name, type: string} + - {name: timezone, type: string} + - {name: session_time, type: string} + - {name: attempt_created_at, type: string} + - {name: attempt_finished_at, type: string} + - {name: task_name, type: string} + - {name: task_start_at, type: string} + - {name: task_end_at, type: string} + - {name: attempt_running_sec, type: string} + - {name: task_running_sec, type: string} + - {name: state, type: string} + - {name: date, type: string} + - {name: time, type: long} +out: + mode: replace \ No newline at end of file diff --git a/scenarios/import_td_logs/import_td_logs.dig b/scenarios/import_td_logs/import_td_logs.dig new file mode 100644 index 00000000..53618992 --- /dev/null +++ b/scenarios/import_td_logs/import_td_logs.dig @@ -0,0 +1,43 @@ +timezone: UTC + +schedule: + daily>: 03:00:00 + +_export: + td: + database: treaure-data-logs + api_endpoint: https://api.treasuredata.com + dla_host: dla1.treasuredata-co.jp + user_id: abcdefg012345 + site: aws + account_id: 1 + query_logs_table: query_logs + workflow_logs_table: workflow_logs + users_table: users + ++create_databases: + td_ddl>: + create_databases: [${td.database}] + ++create_table: + td_ddl>: + create_tables: [${query_logs_table}, ${workflow_logs_table}, ${users_table}] + ++import: + +query_logs: + td_load>: config/query_log.yml + table: ${query_logs_table} + + +workflow_logs: + td_load>: config/workflow_log.yml + table: ${workflow_logs_table} + + +users: + _env: + TD_API_KEY: ${secret:td.apikey} + py>: script.import_td_users.import_users + database: ${td.database} + table: ${users_table} + api_endpoint: ${api_endpoint} + docker: + image: "digdag/digdag-python:3.9" \ No newline at end of file diff --git a/scenarios/import_td_logs/script/import_td_users.py b/scenarios/import_td_logs/script/import_td_users.py new file mode 100644 index 00000000..65639040 --- /dev/null +++ b/scenarios/import_td_logs/script/import_td_users.py @@ -0,0 +1,20 @@ +import os +import sys +os.system(f"{sys.executable} -m pip install -U pandas requests pytd==1.3.0") +import pandas as pd +import pytd +import requests + +td_apikey = os.getenv("TD_API_KEY") + + +def import_users(database, table, api_endpoint): + # get users data + headers = {'Authorization': 'TD1 {}'.format(td_apikey)} + r = requests.get('{}/v3/user/list'.format(api_endpoint), headers=headers) + + # write users data + df = pd.json_normalize(r.json(), record_path=['users']) + client = pytd.Client(apikey=td_apikey, database=database) + client.load_table_from_dataframe( + df, table, writer='bulk_import', if_exists='overwrite')