Optional
options: {Optional
connectedOptional
onOptional
onactivateJobs
allows you to manually activate jobs, effectively building a worker; rather than using the ZBWorker class.
Gracefully shut down all workers, draining existing tasks, and return when it is safe to exit.
Optional
timeout: numberExplicitly complete a job. The method is useful for manually constructing a worker.
Create a new process instance. Asynchronously returns a process instance id.
const zbc = new ZeebeGrpcClient()
zbc.createProcessInstance({
bpmnProcessId: 'onboarding-process',
variables: {
customerId: 'uuid-3455'
},
version: 5 // optional, will use latest by default
}).then(res => console.log(JSON.stringify(res, null, 2)))
zbc.createProcessInstance({
bpmnProcessId: 'SkipFirstTask',
variables: { id: random },
startInstructions: [{elementId: 'second_service_task'}]
}).then(res => (id = res.processInstanceKey))
Create a process instance, and return a Promise that returns the outcome of the process.
Create a worker that polls the gateway for jobs and executes a job handler when units of work are available.
const zbc = new ZB.ZeebeGrpcClient()
const zbWorker = zbc.createWorker({
taskType: 'demo-service',
taskHandler: myTaskHandler,
})
// A job handler must return one of job.complete, job.fail, job.error, or job.forward
// Note: unhandled exceptions in the job handler cause the library to call job.fail
async function myTaskHandler(job) {
zbWorker.log('Task variables', job.variables)
// Task worker business logic goes here
const updateToBrokerVariables = {
updatedProperty: 'newValue',
}
const res = await callExternalSystem(job.variables)
if (res.code === 'SUCCESS') {
return job.complete({
...updateToBrokerVariables,
...res.values
})
}
if (res.code === 'BUSINESS_ERROR') {
return job.error({
code: res.errorCode,
message: res.message
})
}
if (res.code === 'ERROR') {
return job.fail({
errorMessage: res.message,
retryBackOff: 2000
})
}
}
Delete a resource.
The key of the resource that should be deleted. This can either be the key of a process definition, the key of a decision requirements definition or the key of a form.
Deploys a single resources (e.g. process or decision model) to Zeebe.
Errors: PERMISSION_DENIED:
Deploys a single resources (e.g. process or decision model) to Zeebe.
Errors: PERMISSION_DENIED:
Deploys a single resources (e.g. process or decision model) to Zeebe.
Errors: PERMISSION_DENIED:
Deploys one or more resources (e.g. processes or decision models) to Zeebe. Note that this is an atomic call, i.e. either all resources are deployed, or none of them are.
Errors: PERMISSION_DENIED:
Optional
tenantId: stringconst zbc = new ZeebeGrpcClient()
const result = await zbc.deployResources([
{
processFilename: './src/__tests__/testdata/Client-DeployWorkflow.bpmn',
},
{
decisionFilename: './src/__tests__/testdata/quarantine-duration.dmn',
},
{
form: fs.readFileSync('./src/__tests__/testdata/form_1.form'),
name: 'form_1.form',
},
])
Evaluates a decision. The decision to evaluate can be specified either by using its unique key (as returned by DeployResource), or using the decision ID. When using the decision ID, the latest deployed version of the decision is used.
Fail a job. This is useful if you are using the decoupled completion pattern or building your own worker. For the retry count, the current count is available in the job metadata.
Return an array of task types contained in a BPMN file or array of BPMN files. This can be useful, for example, to do
Modify a running process instance. This allows you to move the execution tokens, and change the variables. Added in 8.1. See the gRPC protocol documentation.
zbc.createProcessInstance('SkipFirstTask', {}).then(res =>
zbc.modifyProcessInstance({
processInstanceKey: res.processInstanceKey,
activateInstructions: [{
elementId: 'second_service_task',
ancestorElementInstanceKey: "-1",
variableInstructions: [{
scopeId: '',
variables: { second: 1}
}]
}]
})
)
Publish a message to the broker for correlation with a workflow instance. See this tutorial for a detailed description of message correlation.
Publish a message to the broker for correlation with a workflow message start event. For a message targeting a start event, the correlation key is not needed to target a specific running process instance. However, the hash of the correlationKey is used to determine the partition where this workflow will start. So we assign a random uuid to balance workflow instances created via start message across partitions.
We make the correlationKey optional, because the caller can specify a correlationKey + messageId to guarantee an idempotent message.
Multiple messages with the same correlationKey + messageId combination will only start a workflow once. See: https://github.com/zeebe-io/zeebe/issues/1012 and https://github.com/zeebe-io/zeebe/issues/1022
const zbc = new ZeebeGrpcClient()
zbc.publishStartMessage({
name: 'Start_New_Onboarding_Flow',
variables: {
customerId: 'uuid-348-234-8908'
}
})
// To do the same in an idempotent fashion - note: only idempotent during the lifetime of the created instance.
zbc.publishStartMessage({
name: 'Start_New_Onboarding_Flow',
messageId: 'uuid-348-234-8908', // use customerId to make process idempotent per customer
variables: {
customerId: 'uuid-348-234-8908'
}
})
Resolve an incident by incident key.
type JSONObject = {[key: string]: string | number | boolean | JSONObject}
const zbc = new ZeebeGrpcClient()
async updateAndResolveIncident({
processInstanceId,
incidentKey,
variables
} : {
processInstanceId: string,
incidentKey: string,
variables: JSONObject
}) {
await zbc.setVariables({
elementInstanceKey: processInstanceId,
variables
})
await zbc.updateRetries()
zbc.resolveIncident({
incidentKey
})
zbc.resolveIncident(incidentKey)
}
Directly modify the variables is a process instance. This can be used with resolveIncident
to update the process and resolve an incident.
type JSONObject = {[key: string]: string | number | boolean | JSONObject}
const zbc = new ZeebeGrpcClient()
async function updateAndResolveIncident({
incidentKey,
processInstanceKey,
jobKey,
variableUpdate
} : {
incidentKey: string
processInstanceKey: string
jobKey: string
variableUpdate: JSONObject
}) {
await zbc.setVariables({
elementInstanceKey: processInstanceKey,
variables: variableUpdate
})
await zbc.updateJobRetries({
jobKey,
retries: 1
})
return zbc.resolveIncident({
incidentKey
})
}
Create a worker that uses the StreamActivatedJobs RPC to activate jobs.
NOTE: This will only stream jobs created after the worker is started.
To activate existing jobs, use activateJobs
or createWorker
.
const zbc = new ZB.ZeebeGrpcClient()
const zbStreamWorker = zbc.streamJobs({
type: 'demo-service',
worker: 'my-worker-uuid',
taskHandler: myTaskHandler,
timeout: 30000 // 30 seconds
})
....
// Close the worker stream when done
zbStreamWorker.close()
// A job handler must return one of job.complete, job.fail, job.error, or job.forward
// Note: unhandled exceptions in the job handler cause the library to call job.fail
async function myTaskHandler(job) {
zbWorker.log('Task variables', job.variables)
// Task worker business logic goes here
const updateToBrokerVariables = {
updatedProperty: 'newValue',
}
const res = await callExternalSystem(job.variables)
if (res.code === 'SUCCESS') {
return job.complete({
...updateToBrokerVariables,
...res.values
})
}
if (res.code === 'BUSINESS_ERROR') {
return job.error({
code: res.errorCode,
message: res.message
})
}
if (res.code === 'ERROR') {
return job.fail({
errorMessage: res.message,
retryBackOff: 2000
})
}
}
Fail a job by throwing a business error (i.e. non-technical) that occurs while processing a job.
The error is handled in the workflow by an error catch event.
If there is no error catch event with the specified errorCode
then an incident will be raised instead.
This method is useful when building a worker, for example for the decoupled completion pattern.
type JSONObject = {[key: string]: string | number | boolean | JSONObject}
interface errorResult {
resultType: 'ERROR' as 'ERROR'
errorCode: string
errorMessage: string
}
interface successResult {
resultType: 'SUCCESS' as 'SUCCESS'
variableUpdate: JSONObject
}
type Result = errorResult | successResult
const zbc = new ZeebeGrpcClient()
// This could be a listener on a return queue from an external system
async function handleJob(jobKey: string, result: Result) {
if (resultType === 'ERROR') {
const { errorMessage, errorCode } = result
zbc.throwError({
jobKey,
errorCode,
errorMessage
})
} else {
zbc.completeJob({
jobKey,
variables: result.variableUpdate
})
}
}
Return the broker cluster topology.
Update the number of retries for a Job. This is useful if a job has zero remaining retries and fails, raising an incident.
type JSONObject = {[key: string]: string | number | boolean | JSONObject}
const zbc = new ZeebeGrpcClient()
async function updateAndResolveIncident({
incidentKey,
processInstanceKey,
jobKey,
variableUpdate
} : {
incidentKey: string
processInstanceKey: string
jobKey: string
variableUpdate: JSONObject
}) {
await zbc.setVariables({
elementInstanceKey: processInstanceKey,
variables: variableUpdate
})
await zbc.updateJobRetries({
jobKey,
retries: 1
})
return zbc.resolveIncident({
incidentKey
})
}
Updates the deadline of a job using the timeout (in ms) provided. This can be used for extending or shortening the job deadline.
Errors: NOT_FOUND: - no job exists with the given key
INVALID_STATE: - no deadline exists for the given job key
A client for interacting with a Zeebe broker. With the connection credentials set in the environment, you can use a "zero-conf" constructor with no arguments. All constructor parameters for configuration are optional. If no configuration is provided, the SDK will use environment variables to configure itself. See CamundaSDKConfiguration for the complete list of configuration parameters. Values can be passed in explicitly in code, or set via environment variables (recommended: separate configuration and application logic). Explicitly set values will override environment variables, which are merged into the configuration.
Example