Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,22 @@ const WorkflowContent = React.memo(
const { fitViewToBounds, getViewportCenter } = useCanvasViewport(reactFlowInstance, {
embedded,
})
const { emitCursorUpdate } = useSocket()
const { emitCursorUpdate, joinWorkflow, leaveWorkflow } = useSocket()
useDynamicHandleRefresh()

const workspaceId = propWorkspaceId || (params.workspaceId as string)
const workflowIdParam = propWorkflowId || (params.workflowId as string)

const addNotification = useNotificationStore((state) => state.addNotification)

useEffect(() => {
if (!embedded || !workflowIdParam) return
joinWorkflow(workflowIdParam)
return () => {
leaveWorkflow()
}
}, [embedded, workflowIdParam, joinWorkflow, leaveWorkflow])

useOAuthReturnForWorkflow(workflowIdParam)

const {
Expand Down Expand Up @@ -2144,12 +2152,9 @@ const WorkflowContent = React.memo(

const handleCanvasPointerMove = useCallback(
(event: React.PointerEvent<Element>) => {
const target = event.currentTarget as HTMLElement
const bounds = target.getBoundingClientRect()

const position = screenToFlowPosition({
x: event.clientX - bounds.left,
y: event.clientY - bounds.top,
x: event.clientX,
y: event.clientY,
})

emitCursorUpdate(position)
Expand Down
16 changes: 15 additions & 1 deletion apps/sim/app/workspace/providers/socket-provider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ interface SocketContextType {
onSelectionUpdate: (handler: (data: any) => void) => void
onWorkflowDeleted: (handler: (data: any) => void) => void
onWorkflowReverted: (handler: (data: any) => void) => void
onWorkflowUpdated: (handler: (data: any) => void) => void
onOperationConfirmed: (handler: (data: any) => void) => void
onOperationFailed: (handler: (data: any) => void) => void
}
Expand Down Expand Up @@ -118,6 +119,7 @@ const SocketContext = createContext<SocketContextType>({
onSelectionUpdate: () => {},
onWorkflowDeleted: () => {},
onWorkflowReverted: () => {},
onWorkflowUpdated: () => {},
onOperationConfirmed: () => {},
onOperationFailed: () => {},
})
Expand Down Expand Up @@ -155,6 +157,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
selectionUpdate?: (data: any) => void
workflowDeleted?: (data: any) => void
workflowReverted?: (data: any) => void
workflowUpdated?: (data: any) => void
operationConfirmed?: (data: any) => void
operationFailed?: (data: any) => void
}>({})
Expand Down Expand Up @@ -334,7 +337,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => {
isRejoiningRef.current = false
// Ignore stale success responses from previous navigation
if (workflowId !== urlWorkflowIdRef.current) {
if (urlWorkflowIdRef.current && workflowId !== urlWorkflowIdRef.current) {
logger.debug(`Ignoring stale join-workflow-success for ${workflowId}`)
return
}
Expand Down Expand Up @@ -382,6 +385,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.workflowReverted?.(data)
})

socketInstance.on('workflow-updated', (data) => {
logger.info(`Workflow ${data.workflowId} has been updated externally`)
eventHandlers.current.workflowUpdated?.(data)
})

const rehydrateWorkflowStores = async (workflowId: string, workflowState: any) => {
const [
{ useOperationQueueStore },
Expand Down Expand Up @@ -803,6 +811,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.workflowReverted = handler
}, [])

const onWorkflowUpdated = useCallback((handler: (data: any) => void) => {
eventHandlers.current.workflowUpdated = handler
}, [])

const onOperationConfirmed = useCallback((handler: (data: any) => void) => {
eventHandlers.current.operationConfirmed = handler
}, [])
Expand Down Expand Up @@ -836,6 +848,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
onSelectionUpdate,
onWorkflowDeleted,
onWorkflowReverted,
onWorkflowUpdated,
onOperationConfirmed,
onOperationFailed,
}),
Expand Down Expand Up @@ -863,6 +876,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
onSelectionUpdate,
onWorkflowDeleted,
onWorkflowReverted,
onWorkflowUpdated,
onOperationConfirmed,
onOperationFailed,
]
Expand Down
153 changes: 87 additions & 66 deletions apps/sim/hooks/use-collaborative-workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ export function useCollaborativeWorkflow() {
onVariableUpdate,
onWorkflowDeleted,
onWorkflowReverted,
onWorkflowUpdated,
onOperationConfirmed,
onOperationFailed,
} = useSocket()
Expand Down Expand Up @@ -536,81 +537,99 @@ export function useCollaborativeWorkflow() {
}
}

const reloadWorkflowFromApi = async (workflowId: string, reason: string): Promise<boolean> => {
const response = await fetch(`/api/workflows/${workflowId}`)
if (!response.ok) {
logger.error(`Failed to fetch workflow data after ${reason}: ${response.statusText}`)
return false
}

const responseData = await response.json()
const workflowData = responseData.data

if (!workflowData?.state) {
logger.error(`No state found in workflow data after ${reason}`, { workflowData })
return false
}

isApplyingRemoteChange.current = true
try {
useWorkflowStore.getState().replaceWorkflowState({
blocks: workflowData.state.blocks || {},
edges: workflowData.state.edges || [],
loops: workflowData.state.loops || {},
parallels: workflowData.state.parallels || {},
lastSaved: workflowData.state.lastSaved || Date.now(),
})

const subblockValues: Record<string, Record<string, any>> = {}
Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => {
const blockState = block as any
subblockValues[blockId] = {}
Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => {
subblockValues[blockId][subblockId] = (subblock as any).value
})
})

useSubBlockStore.setState((state: any) => ({
workflowValues: {
...state.workflowValues,
[workflowId]: subblockValues,
},
}))

const graph = {
blocksById: workflowData.state.blocks || {},
edgesById: Object.fromEntries(
(workflowData.state.edges || []).map((e: any) => [e.id, e])
),
}

const undoRedoStore = useUndoRedoStore.getState()
const stackKeys = Object.keys(undoRedoStore.stacks)
stackKeys.forEach((key) => {
const [wfId, userId] = key.split(':')
if (wfId === workflowId) {
undoRedoStore.pruneInvalidEntries(wfId, userId, graph)
}
})

logger.info(`Successfully reloaded workflow state after ${reason}`, { workflowId })
return true
} finally {
isApplyingRemoteChange.current = false
}
}

const handleWorkflowReverted = async (data: any) => {
const { workflowId } = data
logger.info(`Workflow ${workflowId} has been reverted to deployed state`)

// If the reverted workflow is the currently active one, reload the workflow state
if (activeWorkflowId === workflowId) {
logger.info(`Currently active workflow ${workflowId} was reverted, reloading state`)

try {
// Fetch the updated workflow state from the server (which loads from normalized tables)
const response = await fetch(`/api/workflows/${workflowId}`)
if (response.ok) {
const responseData = await response.json()
const workflowData = responseData.data

if (workflowData?.state) {
// Update the workflow store with the reverted state
isApplyingRemoteChange.current = true
try {
// Update the main workflow state using the API response
useWorkflowStore.getState().replaceWorkflowState({
blocks: workflowData.state.blocks || {},
edges: workflowData.state.edges || [],
loops: workflowData.state.loops || {},
parallels: workflowData.state.parallels || {},
lastSaved: workflowData.state.lastSaved || Date.now(),
})
if (activeWorkflowId !== workflowId) return

// Update subblock store with reverted values
const subblockValues: Record<string, Record<string, any>> = {}
Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => {
const blockState = block as any
subblockValues[blockId] = {}
Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => {
subblockValues[blockId][subblockId] = (subblock as any).value
})
})
try {
await reloadWorkflowFromApi(workflowId, 'revert')
} catch (error) {
logger.error('Error reloading workflow state after revert:', error)
}
}

// Update subblock store for this workflow
useSubBlockStore.setState((state: any) => ({
workflowValues: {
...state.workflowValues,
[workflowId]: subblockValues,
},
}))
const handleWorkflowUpdated = async (data: any) => {
const { workflowId } = data
logger.info(`Workflow ${workflowId} has been updated externally`)

logger.info(`Successfully loaded reverted workflow state for ${workflowId}`)
if (activeWorkflowId !== workflowId) return

const graph = {
blocksById: workflowData.state.blocks || {},
edgesById: Object.fromEntries(
(workflowData.state.edges || []).map((e: any) => [e.id, e])
),
}
const { hasActiveDiff } = useWorkflowDiffStore.getState()
if (hasActiveDiff) {
logger.info('Skipping workflow-updated: active diff in progress', { workflowId })
return
}

const undoRedoStore = useUndoRedoStore.getState()
const stackKeys = Object.keys(undoRedoStore.stacks)
stackKeys.forEach((key) => {
const [wfId, userId] = key.split(':')
if (wfId === workflowId) {
undoRedoStore.pruneInvalidEntries(wfId, userId, graph)
}
})
} finally {
isApplyingRemoteChange.current = false
}
} else {
logger.error('No state found in workflow data after revert', { workflowData })
}
} else {
logger.error(`Failed to fetch workflow data after revert: ${response.statusText}`)
}
} catch (error) {
logger.error('Error reloading workflow state after revert:', error)
}
try {
await reloadWorkflowFromApi(workflowId, 'external update')
} catch (error) {
logger.error('Error reloading workflow state after external update:', error)
}
}

Expand All @@ -632,6 +651,7 @@ export function useCollaborativeWorkflow() {
onVariableUpdate(handleVariableUpdate)
onWorkflowDeleted(handleWorkflowDeleted)
onWorkflowReverted(handleWorkflowReverted)
onWorkflowUpdated(handleWorkflowUpdated)
onOperationConfirmed(handleOperationConfirmed)
onOperationFailed(handleOperationFailed)
}, [
Expand All @@ -640,6 +660,7 @@ export function useCollaborativeWorkflow() {
onVariableUpdate,
onWorkflowDeleted,
onWorkflowReverted,
onWorkflowUpdated,
onOperationConfirmed,
onOperationFailed,
activeWorkflowId,
Expand Down
13 changes: 13 additions & 0 deletions apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
type BaseServerTool,
type ServerToolContext,
} from '@/lib/copilot/tools/server/base-tool'
import { env } from '@/lib/core/config/env'
import { applyTargetedLayout, getTargetedLayoutImpact } from '@/lib/workflows/autolayout'
import {
DEFAULT_HORIZONTAL_SPACING,
Expand Down Expand Up @@ -287,6 +288,18 @@ export const editWorkflowServerTool: BaseServerTool<EditWorkflowParams, unknown>

logger.info('Workflow state persisted to database', { workflowId })

const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
fetch(`${socketUrl}/api/workflow-updated`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
body: JSON.stringify({ workflowId }),
}).catch((error) => {
logger.warn('Failed to notify socket server of workflow update', { workflowId, error })
})

const sanitizationWarnings = validation.warnings.length > 0 ? validation.warnings : undefined

return {
Expand Down
Loading