diff --git a/apps/docs/components/icons.tsx b/apps/docs/components/icons.tsx index 8adfadbf9a..9dbf41416a 100644 --- a/apps/docs/components/icons.tsx +++ b/apps/docs/components/icons.tsx @@ -124,6 +124,29 @@ export function ConditionalIcon(props: SVGProps) { ) } +export function CredentialIcon(props: SVGProps) { + return ( + + + + + + + ) +} + export function NoteIcon(props: SVGProps) { return ( ) { ) } +export function DagsterIcon(props: SVGProps) { + return ( + + + + + ) +} + export function DatabricksIcon(props: SVGProps) { return ( diff --git a/apps/docs/components/ui/icon-mapping.ts b/apps/docs/components/ui/icon-mapping.ts index 94c08904f6..11605cae41 100644 --- a/apps/docs/components/ui/icon-mapping.ts +++ b/apps/docs/components/ui/icon-mapping.ts @@ -30,6 +30,7 @@ import { CloudflareIcon, ConfluenceIcon, CursorIcon, + DagsterIcon, DatabricksIcon, DatadogIcon, DevinIcon, @@ -213,6 +214,7 @@ export const blockTypeToIconMap: Record = { cloudflare: CloudflareIcon, confluence_v2: ConfluenceIcon, cursor_v2: CursorIcon, + dagster: DagsterIcon, databricks: DatabricksIcon, datadog: DatadogIcon, devin: DevinIcon, diff --git a/apps/docs/content/docs/en/tools/dagster.mdx b/apps/docs/content/docs/en/tools/dagster.mdx new file mode 100644 index 0000000000..8c4123eb6a --- /dev/null +++ b/apps/docs/content/docs/en/tools/dagster.mdx @@ -0,0 +1,141 @@ +--- +title: Dagster +description: Orchestrate data pipelines and manage job runs on Dagster+ +--- + +import { BlockInfoCard } from "@/components/ui/block-info-card" + + + +## Usage Instructions + +Connect to Dagster+ to launch job runs, monitor run status, list available jobs across repositories, and terminate in-progress runs. Requires a Dagster Cloud API token. + + + +## Tools + +### `dagster_launch_run` + +Launch a Dagster job run in your Dagster+ deployment. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `organizationName` | string | Yes | Dagster+ organization name \(subdomain, e.g., "myorg"\) | +| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | +| `apiKey` | string | Yes | Dagster Cloud API token | +| `repositoryLocationName` | string | Yes | Repository location \(code location\) name | +| `repositoryName` | string | Yes | Repository name within the code location | +| `jobName` | string | Yes | Name of the job to launch | +| `runConfigJson` | string | No | Run configuration as a JSON object \(optional\) | +| `tags` | string | No | Tags as a JSON array of \{key, value\} objects \(optional\) | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `runId` | string | The globally unique ID of the launched run | + +### `dagster_get_run` + +Get the status and details of a Dagster run by its ID. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `organizationName` | string | Yes | Dagster+ organization name \(subdomain\) | +| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | +| `apiKey` | string | Yes | Dagster Cloud API token | +| `runId` | string | Yes | The ID of the run to retrieve | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `runId` | string | Run ID | +| `jobName` | string | Name of the job this run belongs to | +| `status` | string | Run status \(QUEUED, NOT_STARTED, STARTING, MANAGED, STARTED, SUCCESS, FAILURE, CANCELING, CANCELED\) | +| `startTime` | number | Run start time as Unix timestamp | +| `endTime` | number | Run end time as Unix timestamp | +| `runConfigYaml` | string | Run configuration as YAML | +| `tags` | json | Run tags as array of \{key, value\} objects | + +### `dagster_list_runs` + +List recent Dagster runs, optionally filtered by job name. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `organizationName` | string | Yes | Dagster+ organization name \(subdomain\) | +| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | +| `apiKey` | string | Yes | Dagster Cloud API token | +| `jobName` | string | No | Filter runs by job name \(optional\) | +| `limit` | number | No | Maximum number of runs to return \(default 20\) | +| `cursor` | string | No | Pagination cursor from a previous list_runs response | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `runs` | json | Array of runs with runId, jobName, status, startTime, endTime | +| ↳ `runId` | string | Run ID | +| ↳ `jobName` | string | Job name | +| ↳ `status` | string | Run status | +| ↳ `startTime` | number | Start time as Unix timestamp | +| ↳ `endTime` | number | End time as Unix timestamp | +| `cursor` | string | Pagination cursor to retrieve the next page | +| `hasMore` | boolean | Whether more runs are available | + +### `dagster_list_jobs` + +List all jobs across repositories in a Dagster+ deployment. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `organizationName` | string | Yes | Dagster+ organization name \(subdomain\) | +| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | +| `apiKey` | string | Yes | Dagster Cloud API token | +| `repositoryLocationName` | string | No | Filter by repository location name \(optional\) | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `jobs` | json | Array of jobs with name, repositoryName, repositoryLocationName, and description | +| ↳ `name` | string | Job name | +| ↳ `repositoryName` | string | Repository name | +| ↳ `repositoryLocationName` | string | Repository location name | +| ↳ `description` | string | Job description | + +### `dagster_terminate_run` + +Terminate an in-progress Dagster run. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `organizationName` | string | Yes | Dagster+ organization name \(subdomain\) | +| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | +| `apiKey` | string | Yes | Dagster Cloud API token | +| `runId` | string | Yes | The ID of the run to terminate | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `success` | boolean | Whether the run was successfully terminated | +| `runId` | string | The ID of the terminated run | +| `message` | string | Error or status message if termination failed | + + diff --git a/apps/docs/content/docs/en/tools/meta.json b/apps/docs/content/docs/en/tools/meta.json index 220f7a351c..5c27d1c8c4 100644 --- a/apps/docs/content/docs/en/tools/meta.json +++ b/apps/docs/content/docs/en/tools/meta.json @@ -25,6 +25,7 @@ "cloudflare", "confluence", "cursor", + "dagster", "databricks", "datadog", "devin", diff --git a/apps/sim/app/(landing)/integrations/data/icon-mapping.ts b/apps/sim/app/(landing)/integrations/data/icon-mapping.ts index 4e78266631..a14ec3bf79 100644 --- a/apps/sim/app/(landing)/integrations/data/icon-mapping.ts +++ b/apps/sim/app/(landing)/integrations/data/icon-mapping.ts @@ -30,6 +30,7 @@ import { CloudflareIcon, ConfluenceIcon, CursorIcon, + DagsterIcon, DatabricksIcon, DatadogIcon, DevinIcon, @@ -213,6 +214,7 @@ export const blockTypeToIconMap: Record = { cloudflare: CloudflareIcon, confluence_v2: ConfluenceIcon, cursor_v2: CursorIcon, + dagster: DagsterIcon, databricks: DatabricksIcon, datadog: DatadogIcon, devin: DevinIcon, diff --git a/apps/sim/app/(landing)/integrations/data/integrations.json b/apps/sim/app/(landing)/integrations/data/integrations.json index 4cbd8dd5da..984440358a 100644 --- a/apps/sim/app/(landing)/integrations/data/integrations.json +++ b/apps/sim/app/(landing)/integrations/data/integrations.json @@ -2243,6 +2243,45 @@ "integrationType": "developer-tools", "tags": ["agentic", "automation"] }, + { + "type": "dagster", + "slug": "dagster", + "name": "Dagster", + "description": "Orchestrate data pipelines and manage job runs on Dagster+", + "longDescription": "Connect to Dagster+ to launch job runs, monitor run status, list available jobs across repositories, and terminate in-progress runs. Requires a Dagster Cloud API token.", + "bgColor": "#191A23", + "iconName": "DagsterIcon", + "docsUrl": "https://docs.sim.ai/tools/dagster", + "operations": [ + { + "name": "Launch Run", + "description": "Launch a Dagster job run in your Dagster+ deployment." + }, + { + "name": "Get Run", + "description": "Get the status and details of a Dagster run by its ID." + }, + { + "name": "List Runs", + "description": "List recent Dagster runs, optionally filtered by job name." + }, + { + "name": "List Jobs", + "description": "List all jobs across repositories in a Dagster+ deployment." + }, + { + "name": "Terminate Run", + "description": "Terminate an in-progress Dagster run." + } + ], + "operationCount": 5, + "triggers": [], + "triggerCount": 0, + "authType": "api-key", + "category": "tools", + "integrationType": "automation", + "tags": ["data-analytics", "cloud", "automation"] + }, { "type": "databricks", "slug": "databricks", diff --git a/apps/sim/blocks/blocks/dagster.ts b/apps/sim/blocks/blocks/dagster.ts new file mode 100644 index 0000000000..fbf7541148 --- /dev/null +++ b/apps/sim/blocks/blocks/dagster.ts @@ -0,0 +1,196 @@ +import { DagsterIcon } from '@/components/icons' +import type { BlockConfig } from '@/blocks/types' +import { IntegrationType } from '@/blocks/types' +import type { DagsterResponse } from '@/tools/dagster/types' + +export const DagsterBlock: BlockConfig = { + type: 'dagster', + name: 'Dagster', + description: 'Orchestrate data pipelines and manage job runs with Dagster', + longDescription: + 'Connect to a Dagster instance to launch job runs, monitor run status, list available jobs across repositories, and terminate in-progress runs. API token only required for Dagster+.', + docsLink: 'https://docs.sim.ai/tools/dagster', + category: 'tools', + integrationType: IntegrationType.Automation, + tags: ['data-analytics', 'automation'], + bgColor: '#191A23', + icon: DagsterIcon, + + subBlocks: [ + { + id: 'operation', + title: 'Operation', + type: 'dropdown', + options: [ + { label: 'Launch Run', id: 'launch_run' }, + { label: 'Get Run', id: 'get_run' }, + { label: 'List Runs', id: 'list_runs' }, + { label: 'List Jobs', id: 'list_jobs' }, + { label: 'Terminate Run', id: 'terminate_run' }, + ], + value: () => 'launch_run', + }, + + // ── Launch Run ── + { + id: 'repositoryLocationName', + title: 'Repository Location', + type: 'short-input', + placeholder: 'e.g., my_code_location', + condition: { field: 'operation', value: 'launch_run' }, + required: { field: 'operation', value: 'launch_run' }, + }, + { + id: 'repositoryName', + title: 'Repository Name', + type: 'short-input', + placeholder: 'e.g., __repository__', + condition: { field: 'operation', value: 'launch_run' }, + required: { field: 'operation', value: 'launch_run' }, + }, + { + id: 'jobName', + title: 'Job Name', + type: 'short-input', + placeholder: 'e.g., my_pipeline_job', + condition: { field: 'operation', value: 'launch_run' }, + required: { field: 'operation', value: 'launch_run' }, + }, + { + id: 'runConfigJson', + title: 'Run Config', + type: 'code', + placeholder: '{"ops": {"my_op": {"config": {"key": "value"}}}}', + condition: { field: 'operation', value: 'launch_run' }, + mode: 'advanced', + wandConfig: { + enabled: true, + prompt: `Generate a Dagster run config JSON object based on the user's description. + +Examples: +- "set partition date to 2024-01-15" -> {"ops": {"load_partition": {"config": {"partition_date": "2024-01-15"}}}} +- "run with debug logging" -> {"execution": {"multiprocess": {"config": {"max_concurrent": 1}}}} + +Return ONLY a valid JSON object - no explanations, no extra text.`, + placeholder: 'Describe the run configuration...', + generationType: 'json-object', + }, + }, + { + id: 'tags', + title: 'Tags', + type: 'code', + placeholder: '[{"key": "env", "value": "prod"}]', + condition: { field: 'operation', value: 'launch_run' }, + mode: 'advanced', + }, + + // ── Get Run / Terminate Run ── + { + id: 'runId', + title: 'Run ID', + type: 'short-input', + placeholder: 'e.g., abc123def456', + condition: { field: 'operation', value: ['get_run', 'terminate_run'] }, + required: { field: 'operation', value: ['get_run', 'terminate_run'] }, + }, + + // ── List Runs ── + { + id: 'listRunsJobName', + title: 'Job Name Filter', + type: 'short-input', + placeholder: 'Filter by job name (optional)', + condition: { field: 'operation', value: 'list_runs' }, + }, + { + id: 'statuses', + title: 'Status Filter', + type: 'short-input', + placeholder: 'e.g. SUCCESS,FAILURE (optional)', + condition: { field: 'operation', value: 'list_runs' }, + mode: 'advanced', + }, + { + id: 'limit', + title: 'Limit', + type: 'short-input', + placeholder: '20', + condition: { field: 'operation', value: 'list_runs' }, + mode: 'advanced', + }, + + // ── Connection (common) ── + { + id: 'host', + title: 'Host', + type: 'short-input', + placeholder: 'http://localhost:3001 or https://myorg.dagster.cloud/prod', + required: true, + }, + { + id: 'apiKey', + title: 'API Token', + type: 'short-input', + placeholder: 'Dagster+ API token (leave blank for OSS / self-hosted)', + password: true, + }, + ], + + tools: { + access: [ + 'dagster_launch_run', + 'dagster_get_run', + 'dagster_list_runs', + 'dagster_list_jobs', + 'dagster_terminate_run', + ], + config: { + tool: (params) => `dagster_${params.operation}`, + params: (params) => { + const result: Record = {} + if (params.limit != null && params.limit !== '') result.limit = Number(params.limit) + // Map list_runs job name filter to the correct param + if (params.listRunsJobName) result.jobName = params.listRunsJobName + return result + }, + }, + }, + + inputs: { + operation: { type: 'string', description: 'Operation to perform' }, + host: { type: 'string', description: 'Dagster host URL' }, + apiKey: { + type: 'string', + description: 'Dagster Cloud API token (optional for self-hosted instances)', + }, + repositoryLocationName: { type: 'string', description: 'Repository location name' }, + repositoryName: { type: 'string', description: 'Repository name' }, + jobName: { type: 'string', description: 'Job name to launch' }, + runConfigJson: { type: 'string', description: 'Run configuration as JSON' }, + tags: { type: 'string', description: 'Tags as JSON array of {key, value} objects' }, + runId: { type: 'string', description: 'Run ID' }, + listRunsJobName: { type: 'string', description: 'Filter list_runs by job name' }, + statuses: { type: 'string', description: 'Comma-separated run statuses to filter by' }, + limit: { type: 'number', description: 'Maximum results to return' }, + }, + + outputs: { + // Launch Run + runId: { type: 'string', description: 'Launched or queried run ID' }, + // Get Run + jobName: { type: 'string', description: 'Job name the run belongs to' }, + status: { type: 'string', description: 'Run status' }, + startTime: { type: 'number', description: 'Run start time (Unix timestamp)' }, + endTime: { type: 'number', description: 'Run end time (Unix timestamp)' }, + runConfigYaml: { type: 'string', description: 'Run configuration as YAML' }, + tags: { type: 'json', description: 'Run tags as array of {key, value} objects' }, + // List Runs + runs: { type: 'json', description: 'List of runs' }, + // List Jobs + jobs: { type: 'json', description: 'List of jobs across all repositories' }, + // Terminate Run + success: { type: 'boolean', description: 'Whether termination succeeded' }, + message: { type: 'string', description: 'Termination status or error message' }, + }, +} diff --git a/apps/sim/blocks/registry.ts b/apps/sim/blocks/registry.ts index 091408b7e9..2b66cb5940 100644 --- a/apps/sim/blocks/registry.ts +++ b/apps/sim/blocks/registry.ts @@ -28,6 +28,7 @@ import { ConditionBlock } from '@/blocks/blocks/condition' import { ConfluenceBlock, ConfluenceV2Block } from '@/blocks/blocks/confluence' import { CredentialBlock } from '@/blocks/blocks/credential' import { CursorBlock, CursorV2Block } from '@/blocks/blocks/cursor' +import { DagsterBlock } from '@/blocks/blocks/dagster' import { DatabricksBlock } from '@/blocks/blocks/databricks' import { DatadogBlock } from '@/blocks/blocks/datadog' import { DevinBlock } from '@/blocks/blocks/devin' @@ -249,6 +250,7 @@ export const registry: Record = { confluence_v2: ConfluenceV2Block, cursor: CursorBlock, cursor_v2: CursorV2Block, + dagster: DagsterBlock, databricks: DatabricksBlock, datadog: DatadogBlock, devin: DevinBlock, diff --git a/apps/sim/components/icons.tsx b/apps/sim/components/icons.tsx index 49dda618cc..86417a78fa 100644 --- a/apps/sim/components/icons.tsx +++ b/apps/sim/components/icons.tsx @@ -4868,6 +4868,77 @@ export function SSHIcon(props: SVGProps) { ) } +export function DagsterIcon(props: SVGProps) { + return ( + + + + + + + + + + + + + + + + + + + ) +} + export function DatabricksIcon(props: SVGProps) { return ( diff --git a/apps/sim/tools/dagster/get_run.ts b/apps/sim/tools/dagster/get_run.ts new file mode 100644 index 0000000000..b515d839a6 --- /dev/null +++ b/apps/sim/tools/dagster/get_run.ts @@ -0,0 +1,145 @@ +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/graphql' +import type { DagsterGetRunParams, DagsterGetRunResponse } from '@/tools/dagster/types' +import type { ToolConfig } from '@/tools/types' + +/** Fields selected on `runOrError` when the union resolves to `Run`. */ +interface DagsterGetRunGraphqlRun { + runId: string + jobName: string | null + status: string + startTime: number | null + endTime: number | null + runConfigYaml: string | null + tags: Array<{ key: string; value: string }> | null +} + +const GET_RUN_QUERY = ` + query GetRun($runId: ID!) { + runOrError(runId: $runId) { + ... on Run { + runId + jobName + status + startTime + endTime + runConfigYaml + tags { + key + value + } + } + ... on Error { + __typename + message + } + } + } +` + +export const getRunTool: ToolConfig = { + id: 'dagster_get_run', + name: 'Dagster Get Run', + description: 'Get the status and details of a Dagster run by its ID.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3000)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + runId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'The ID of the run to retrieve', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: GET_RUN_QUERY, + variables: { runId: params.runId }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ runOrError?: unknown }>(response) + + const raw = data.data?.runOrError + if (!raw || typeof raw !== 'object') throw new Error('Unexpected response from Dagster') + + if (!('runId' in raw) || typeof (raw as { runId: unknown }).runId !== 'string') { + throw new Error( + dagsterUnionErrorMessage(raw as { message?: string }, 'Run not found or Dagster error') + ) + } + + const run = raw as DagsterGetRunGraphqlRun + + return { + success: true, + output: { + runId: run.runId, + jobName: run.jobName ?? null, + status: run.status, + startTime: run.startTime ?? null, + endTime: run.endTime ?? null, + runConfigYaml: run.runConfigYaml ?? null, + tags: run.tags ?? null, + }, + } + }, + + outputs: { + runId: { + type: 'string', + description: 'Run ID', + }, + jobName: { + type: 'string', + description: 'Name of the job this run belongs to', + optional: true, + }, + status: { + type: 'string', + description: + 'Run status (QUEUED, NOT_STARTED, STARTING, MANAGED, STARTED, SUCCESS, FAILURE, CANCELING, CANCELED)', + }, + startTime: { + type: 'number', + description: 'Run start time as Unix timestamp', + optional: true, + }, + endTime: { + type: 'number', + description: 'Run end time as Unix timestamp', + optional: true, + }, + runConfigYaml: { + type: 'string', + description: 'Run configuration as YAML', + optional: true, + }, + tags: { + type: 'json', + description: 'Run tags as array of {key, value} objects', + optional: true, + }, + }, +} diff --git a/apps/sim/tools/dagster/graphql.ts b/apps/sim/tools/dagster/graphql.ts new file mode 100644 index 0000000000..b41fc13533 --- /dev/null +++ b/apps/sim/tools/dagster/graphql.ts @@ -0,0 +1,41 @@ +/** + * Parses a Dagster GraphQL JSON body and throws if the HTTP status is not OK or the payload + * contains top-level GraphQL errors. + * + * Field errors should be requested with `... on Error { __typename message }` (or at least + * `message`) so union failures are not returned as empty objects. + */ +export async function parseDagsterGraphqlResponse>( + response: Response +): Promise<{ data?: TData }> { + let payload: { + data?: TData + errors?: ReadonlyArray<{ message?: string }> + } + try { + payload = (await response.json()) as { + data?: TData + errors?: ReadonlyArray<{ message?: string }> + } + } catch { + throw new Error('Invalid JSON response from Dagster') + } + if (!response.ok) { + throw new Error(payload.errors?.[0]?.message || 'Dagster GraphQL request failed') + } + if (payload.errors?.length) { + throw new Error(payload.errors[0]?.message ?? 'Dagster GraphQL request failed') + } + return { data: payload.data } +} + +/** + * Message from a field that includes `... on Error { message }`, or a fallback when the + * payload is not a GraphQL `Error` type with a string message. + */ +export function dagsterUnionErrorMessage( + result: { message?: string } | undefined, + fallback: string +): string { + return typeof result?.message === 'string' ? result.message : fallback +} diff --git a/apps/sim/tools/dagster/index.ts b/apps/sim/tools/dagster/index.ts new file mode 100644 index 0000000000..d65455af2a --- /dev/null +++ b/apps/sim/tools/dagster/index.ts @@ -0,0 +1,11 @@ +import { getRunTool } from '@/tools/dagster/get_run' +import { launchRunTool } from '@/tools/dagster/launch_run' +import { listJobsTool } from '@/tools/dagster/list_jobs' +import { listRunsTool } from '@/tools/dagster/list_runs' +import { terminateRunTool } from '@/tools/dagster/terminate_run' + +export const dagsterLaunchRunTool = launchRunTool +export const dagsterGetRunTool = getRunTool +export const dagsterListRunsTool = listRunsTool +export const dagsterListJobsTool = listJobsTool +export const dagsterTerminateRunTool = terminateRunTool diff --git a/apps/sim/tools/dagster/launch_run.ts b/apps/sim/tools/dagster/launch_run.ts new file mode 100644 index 0000000000..9ed534e398 --- /dev/null +++ b/apps/sim/tools/dagster/launch_run.ts @@ -0,0 +1,170 @@ +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/graphql' +import type { DagsterLaunchRunParams, DagsterLaunchRunResponse } from '@/tools/dagster/types' +import type { ToolConfig } from '@/tools/types' + +interface LaunchRunResult { + type: string + run?: { runId: string } + message?: string +} + +function buildLaunchRunMutation(hasConfig: boolean, hasTags: boolean) { + const varDefs = [ + '$repositoryLocationName: String!', + '$repositoryName: String!', + '$jobName: String!', + ] + if (hasConfig) varDefs.push('$runConfigData: RunConfigData') + if (hasTags) varDefs.push('$tags: [ExecutionTag!]') + + const execParams = [ + `selector: { + repositoryLocationName: $repositoryLocationName + repositoryName: $repositoryName + jobName: $jobName + }`, + ] + if (hasConfig) execParams.push('runConfigData: $runConfigData') + if (hasTags) execParams.push('executionMetadata: { tags: $tags }') + + return ` + mutation LaunchRun(${varDefs.join(', ')}) { + launchRun( + executionParams: { + ${execParams.join('\n ')} + } + ) { + type: __typename + ... on LaunchRunSuccess { + run { + runId + } + } + ... on Error { + __typename + message + } + } + } + ` +} + +export const launchRunTool: ToolConfig = { + id: 'dagster_launch_run', + name: 'Dagster Launch Run', + description: 'Launch a job run on a Dagster instance.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3000)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + repositoryLocationName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository location (code location) name', + }, + repositoryName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository name within the code location', + }, + jobName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Name of the job to launch', + }, + runConfigJson: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Run configuration as a JSON object (optional)', + }, + tags: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Tags as a JSON array of {key, value} objects (optional)', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => { + const variables: Record = { + repositoryLocationName: params.repositoryLocationName, + repositoryName: params.repositoryName, + jobName: params.jobName, + } + + let hasConfig = false + if (params.runConfigJson) { + try { + variables.runConfigData = JSON.parse(params.runConfigJson) + hasConfig = true + } catch { + throw new Error('Invalid JSON in runConfigJson') + } + } + + let hasTags = false + if (params.tags) { + try { + variables.tags = JSON.parse(params.tags) + hasTags = true + } catch { + throw new Error('Invalid JSON in tags') + } + } + + return { + query: buildLaunchRunMutation(hasConfig, hasTags), + variables, + } + }, + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ launchRun?: unknown }>(response) + + const result = data.data?.launchRun as LaunchRunResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'LaunchRunSuccess' && result.run) { + return { + success: true, + output: { runId: result.run.runId }, + } + } + + throw new Error( + `${result.type}: ${dagsterUnionErrorMessage(result, 'Launch run failed')}` + ) + }, + + outputs: { + runId: { + type: 'string', + description: 'The globally unique ID of the launched run', + }, + }, +} diff --git a/apps/sim/tools/dagster/list_jobs.ts b/apps/sim/tools/dagster/list_jobs.ts new file mode 100644 index 0000000000..aab476405e --- /dev/null +++ b/apps/sim/tools/dagster/list_jobs.ts @@ -0,0 +1,99 @@ +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/graphql' +import type { DagsterBaseParams, DagsterListJobsResponse } from '@/tools/dagster/types' +import type { ToolConfig } from '@/tools/types' + +const LIST_JOBS_QUERY = ` + query ListJobNames { + repositoriesOrError { + ... on RepositoryConnection { + nodes { + name + jobs { + name + } + } + } + ... on Error { + __typename + message + } + } + } +` + +export const listJobsTool: ToolConfig = { + id: 'dagster_list_jobs', + name: 'Dagster List Jobs', + description: 'List all jobs across repositories in a Dagster instance.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: () => ({ + query: LIST_JOBS_QUERY, + variables: {}, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ repositoriesOrError?: unknown }>(response) + + const result = data.data?.repositoriesOrError as + | { nodes?: Array<{ name: string; jobs?: Array<{ name: string }> }>; message?: string } + | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (!Array.isArray(result.nodes)) { + throw new Error(dagsterUnionErrorMessage(result, 'List jobs failed')) + } + + const jobs: Array<{ name: string; repositoryName: string }> = [] + + for (const repo of result.nodes) { + for (const job of repo.jobs ?? []) { + jobs.push({ + name: job.name, + repositoryName: repo.name, + }) + } + } + + return { + success: true, + output: { jobs }, + } + }, + + outputs: { + jobs: { + type: 'json', + description: 'Array of jobs with name and repositoryName', + properties: { + name: { type: 'string', description: 'Job name' }, + repositoryName: { type: 'string', description: 'Repository name' }, + }, + }, + }, +} diff --git a/apps/sim/tools/dagster/list_runs.ts b/apps/sim/tools/dagster/list_runs.ts new file mode 100644 index 0000000000..646e6beb6b --- /dev/null +++ b/apps/sim/tools/dagster/list_runs.ts @@ -0,0 +1,151 @@ +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/graphql' +import type { DagsterListRunsParams, DagsterListRunsResponse } from '@/tools/dagster/types' +import type { ToolConfig } from '@/tools/types' + +/** Shape of each run in the `runsOrError` → `Runs.results` GraphQL selection set. */ +interface DagsterListRunsGraphqlRow { + runId: string + jobName: string | null + status: string + tags: Array<{ key: string; value: string }> | null + startTime: number | null + endTime: number | null +} + +function buildListRunsQuery(hasFilter: boolean) { + return ` + query ListRuns($limit: Int${hasFilter ? ', $filter: RunsFilter' : ''}) { + runsOrError(limit: $limit${hasFilter ? ', filter: $filter' : ''}) { + ... on Runs { + results { + runId + jobName + status + tags { + key + value + } + startTime + endTime + } + } + ... on Error { + __typename + message + } + } + } + ` +} + +export const listRunsTool: ToolConfig = { + id: 'dagster_list_runs', + name: 'Dagster List Runs', + description: 'List recent Dagster runs, optionally filtered by job name.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + jobName: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter runs by job name (optional)', + }, + statuses: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Comma-separated run statuses to filter by, e.g. "SUCCESS,FAILURE" (optional)', + }, + limit: { + type: 'number', + required: false, + visibility: 'user-or-llm', + description: 'Maximum number of runs to return (default 20)', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => { + const filter: Record = {} + if (params.jobName) filter.pipelineName = params.jobName + if (params.statuses) { + filter.statuses = params.statuses + .split(',') + .map((s: string) => s.trim()) + .filter(Boolean) + } + + const hasFilter = Object.keys(filter).length > 0 + const variables: Record = { limit: params.limit ?? 20 } + if (hasFilter) variables.filter = filter + + return { + query: buildListRunsQuery(hasFilter), + variables, + } + }, + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ runsOrError?: unknown }>(response) + + const result = data.data?.runsOrError as + | { results?: DagsterListRunsGraphqlRow[]; message?: string } + | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (!Array.isArray(result.results)) { + throw new Error(dagsterUnionErrorMessage(result, 'Dagster returned an error listing runs')) + } + + const runs = result.results.map((r: DagsterListRunsGraphqlRow) => ({ + runId: r.runId, + jobName: r.jobName ?? null, + status: r.status, + tags: r.tags ?? null, + startTime: r.startTime ?? null, + endTime: r.endTime ?? null, + })) + + return { + success: true, + output: { runs }, + } + }, + + outputs: { + runs: { + type: 'json', + description: 'Array of runs', + properties: { + runId: { type: 'string', description: 'Run ID' }, + jobName: { type: 'string', description: 'Job name' }, + status: { type: 'string', description: 'Run status' }, + tags: { type: 'json', description: 'Run tags as array of {key, value} objects' }, + startTime: { type: 'number', description: 'Start time as Unix timestamp' }, + endTime: { type: 'number', description: 'End time as Unix timestamp' }, + }, + }, + }, +} diff --git a/apps/sim/tools/dagster/terminate_run.ts b/apps/sim/tools/dagster/terminate_run.ts new file mode 100644 index 0000000000..c79d6ccfb4 --- /dev/null +++ b/apps/sim/tools/dagster/terminate_run.ts @@ -0,0 +1,131 @@ +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/graphql' +import type { DagsterTerminateRunParams, DagsterTerminateRunResponse } from '@/tools/dagster/types' +import type { ToolConfig } from '@/tools/types' + +/** Fields returned from `terminateRun` for success and `Error` union members. */ +interface DagsterTerminateRunPayload { + __typename?: string + run?: { runId: string } + message?: string +} + +const TERMINATE_RUN_MUTATION = ` + mutation TerminateRun($runId: String!) { + terminateRun(runId: $runId) { + ... on TerminateRunSuccess { + run { + runId + } + } + ... on Error { + __typename + message + } + } + } +` + +export const terminateRunTool: ToolConfig = + { + id: 'dagster_terminate_run', + name: 'Dagster Terminate Run', + description: 'Terminate an in-progress Dagster run.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + runId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'The ID of the run to terminate', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: TERMINATE_RUN_MUTATION, + variables: { runId: params.runId }, + }), + }, + + transformResponse: async (response: Response, params?: DagsterTerminateRunParams) => { + const data = await parseDagsterGraphqlResponse<{ terminateRun?: DagsterTerminateRunPayload }>( + response + ) + + const result = data.data?.terminateRun + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.run?.runId) { + return { + success: true, + output: { + success: true, + runId: result.run.runId, + message: null, + }, + } + } + + if (typeof result.message === 'string') { + if (result.__typename === 'RunNotFoundError') { + throw new Error(result.message) + } + if (result.__typename === 'TerminateRunFailure') { + const runId = params?.runId + if (!runId) { + throw new Error( + 'Terminate run failed but runId was not available in tool context; ensure the tool is invoked with runId.' + ) + } + return { + success: true, + output: { + success: false, + runId, + message: result.message, + }, + } + } + throw new Error(result.message) + } + + throw new Error(dagsterUnionErrorMessage(result, 'Terminate run failed')) + }, + + outputs: { + success: { + type: 'boolean', + description: 'Whether the run was successfully terminated', + }, + runId: { + type: 'string', + description: 'The ID of the terminated run', + }, + message: { + type: 'string', + description: 'Error or status message if termination failed', + optional: true, + }, + }, + } diff --git a/apps/sim/tools/dagster/types.ts b/apps/sim/tools/dagster/types.ts new file mode 100644 index 0000000000..d0951518ad --- /dev/null +++ b/apps/sim/tools/dagster/types.ts @@ -0,0 +1,92 @@ +import type { ToolResponse } from '@/tools/types' + +/** Base parameters shared by all Dagster tools */ +export interface DagsterBaseParams { + /** Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001) */ + host: string + /** Dagster+ API token */ + apiKey?: string +} + +/** Launch Run */ +export interface DagsterLaunchRunParams extends DagsterBaseParams { + repositoryLocationName: string + repositoryName: string + jobName: string + runConfigJson?: string + tags?: string +} + +export interface DagsterLaunchRunResponse extends ToolResponse { + output: { + runId: string + } +} + +/** Get Run */ +export interface DagsterGetRunParams extends DagsterBaseParams { + runId: string +} + +export interface DagsterGetRunResponse extends ToolResponse { + output: { + runId: string + jobName: string | null + status: string + startTime: number | null + endTime: number | null + runConfigYaml: string | null + tags: Array<{ key: string; value: string }> | null + } +} + +/** List Runs */ +export interface DagsterListRunsParams extends DagsterBaseParams { + jobName?: string + statuses?: string + limit?: number +} + +export interface DagsterListRunsResponse extends ToolResponse { + output: { + runs: Array<{ + runId: string + jobName: string | null + status: string + tags: Array<{ key: string; value: string }> | null + startTime: number | null + endTime: number | null + }> + } +} + +/** List Jobs */ +export interface DagsterListJobsResponse extends ToolResponse { + output: { + jobs: Array<{ + name: string + repositoryName: string + }> + } +} + +/** Terminate Run */ +export interface DagsterTerminateRunParams extends DagsterBaseParams { + runId: string +} + +export interface DagsterTerminateRunResponse extends ToolResponse { + output: { + success: boolean + runId: string + message: string | null + } +} + +/** Union type for all Dagster responses */ +export type DagsterResponse = + | DagsterLaunchRunResponse + | DagsterGetRunResponse + | DagsterListRunsResponse + | DagsterListJobsResponse + | DagsterTerminateRunResponse diff --git a/apps/sim/tools/registry.ts b/apps/sim/tools/registry.ts index 7508d98dac..f270931e39 100644 --- a/apps/sim/tools/registry.ts +++ b/apps/sim/tools/registry.ts @@ -339,6 +339,13 @@ import { cursorStopAgentTool, cursorStopAgentV2Tool, } from '@/tools/cursor' +import { + dagsterGetRunTool, + dagsterLaunchRunTool, + dagsterListJobsTool, + dagsterListRunsTool, + dagsterTerminateRunTool, +} from '@/tools/dagster' import { databricksCancelRunTool, databricksExecuteSqlTool, @@ -3397,6 +3404,11 @@ export const tools: Record = { devin_get_session: devinGetSessionTool, devin_list_sessions: devinListSessionsTool, devin_send_message: devinSendMessageTool, + dagster_get_run: dagsterGetRunTool, + dagster_launch_run: dagsterLaunchRunTool, + dagster_list_jobs: dagsterListJobsTool, + dagster_list_runs: dagsterListRunsTool, + dagster_terminate_run: dagsterTerminateRunTool, databricks_cancel_run: databricksCancelRunTool, databricks_execute_sql: databricksExecuteSqlTool, databricks_get_run: databricksGetRunTool, diff --git a/apps/sim/tools/utils.server.ts b/apps/sim/tools/utils.server.ts index c4e06cf94b..fbf1249a3f 100644 --- a/apps/sim/tools/utils.server.ts +++ b/apps/sim/tools/utils.server.ts @@ -77,7 +77,7 @@ export async function executeRequest( output: await resp.json(), })) - return await transformResponse(resolvedResponse as Response) + return await transformResponse(resolvedResponse as Response, requestParams.toolParams as never) } catch (error: any) { return { success: false, diff --git a/apps/sim/tools/utils.test.ts b/apps/sim/tools/utils.test.ts index 43a5531da9..786dc1165c 100644 --- a/apps/sim/tools/utils.test.ts +++ b/apps/sim/tools/utils.test.ts @@ -150,6 +150,7 @@ describe('formatRequestParams', () => { method: 'GET', headers: { 'Content-Type': 'application/json' }, body: undefined, // No body for GET + toolParams: params, }) expect(mockTool.request.headers).toHaveBeenCalledWith(params) @@ -166,6 +167,7 @@ describe('formatRequestParams', () => { method: 'GET', headers: { 'Content-Type': 'application/json' }, body: undefined, + toolParams: params, }) }) diff --git a/apps/sim/tools/utils.ts b/apps/sim/tools/utils.ts index 2f944c18bd..e0a7bd5e10 100644 --- a/apps/sim/tools/utils.ts +++ b/apps/sim/tools/utils.ts @@ -74,6 +74,8 @@ export interface RequestParams { headers: Record body?: string timeout?: number + /** Original tool invocation params; forwarded to `transformResponse` when using `executeRequest`. */ + toolParams?: Record } /** @@ -129,7 +131,7 @@ export function formatRequestParams(tool: ToolConfig, params: Record