Description

A client for interacting with a Zeebe broker. With the connection credentials set in the environment, you can use a "zero-conf" constructor with no arguments.

Example

const zbc = new ZeebeGrpcClient()
zbc.topology().then(info =>
console.log(JSON.stringify(info, null, 2))
)

Hierarchy (view full)

Constructors

Properties

closePromise?: Promise<null>
closing: boolean = false
connected?: boolean = undefined
connectionTolerance: MaybeTimeDuration
customSSL?: CustomSSL
gatewayAddress: string
grpc: Promise<ZBGrpc>
loglevel: Loglevel
maxRetries: number
maxRetryTimeout: MaybeTimeDuration
oAuthProvider: IOAuthProvider
onConnectionError?: ((err) => void)

Type declaration

    • (err): void
    • Parameters

      • err: Error

      Returns void

onReady?: (() => void)

Type declaration

    • (): void
    • Returns void

readied: boolean = false
retry: boolean
streamWorker?: ZBStreamWorker
tenantId?: string
useTLS: boolean
workerCount: number = 0
workers: ZBWorker<any, any, any>[] = []

Methods

  • Parameters

    • processInstanceKey: string
    • Optional operationReference: number | LosslessNumber

    Returns Promise<void>

    Description

    Cancel a process instance by process instance key.

    Example

    const zbc = new ZeebeGrpcClient()

    zbc.cancelProcessInstance(processInstanceId)
    .catch(
    (e: any) => console.log(`Error cancelling instance: ${e.message}`)
    )
  • Parameters

    • Optional timeout: number

    Returns Promise<null>

    Description

    Gracefully shut down all workers, draining existing tasks, and return when it is safe to exit.

    Example

    const zbc = new ZeebeGrpcClient()

    zbc.createWorker({
    taskType:
    })

    setTimeout(async () => {
    await zbc.close()
    console.log('All work completed.')
    }),
    5 * 60 * 1000 // 5 mins
    )
  • Parameters

    Returns Promise<void>

    Description

    Explicitly complete a job. The method is useful for manually constructing a worker.

    Example

    const zbc = new ZeebeGrpcClient()
    zbc.activateJobs({
    maxJobsToActivate: 5,
    requestTimeout: 6000,
    timeout: 5 * 60 * 1000,
    type: 'process-payment',
    worker: 'my-worker-uuid'
    }).then(jobs =>
    jobs.forEach(job =>
    // business logic
    zbc.completeJob({
    jobKey: job.key,
    variables: {}
    ))
    )
    })
  • Parameters

    • __namedParameters: {
          grpcConfig: {
              namespace: string;
              onConnectionError?: (() => void);
              onReady?: (() => void);
              tasktype?: string;
          };
          logConfig: ZBLoggerConfig;
      }
      • grpcConfig: {
            namespace: string;
            onConnectionError?: (() => void);
            onReady?: (() => void);
            tasktype?: string;
        }
        • namespace: string
        • Optional onConnectionError?: (() => void)
            • (): void
            • Returns void

        • Optional onReady?: (() => void)
            • (): void
            • Returns void

        • Optional tasktype?: string
      • logConfig: ZBLoggerConfig

    Returns {
        grpcClient: ZBGrpc;
        log: StatefulLogInterceptor;
    }

  • Type Parameters

    Returns Promise<CreateProcessInstanceResponse>

    Description

    Create a new process instance. Asynchronously returns a process instance id.

    Example

    const zbc = new ZeebeGrpcClient()

    zbc.createProcessInstance({
    bpmnProcessId: 'onboarding-process',
    variables: {
    customerId: 'uuid-3455'
    },
    version: 5 // optional, will use latest by default
    }).then(res => console.log(JSON.stringify(res, null, 2)))

    zbc.createProcessInstance({
    bpmnProcessId: 'SkipFirstTask',
    variables: { id: random },
    startInstructions: [{elementId: 'second_service_task'}]
    }).then(res => (id = res.processInstanceKey))
  • Delete a resource.

    Parameters

    • resourceId: {
          operationReference?: number | LosslessNumber;
          resourceKey: string;
      }

      The key of the resource that should be deleted. This can either be the key of a process definition, the key of a decision requirements definition or the key of a form.

      • Optional operationReference?: number | LosslessNumber
      • resourceKey: string

    Returns Promise<Record<string, never>>

  • Parameters

    • resource: {
          processFilename: string;
          tenantId?: string;
      } | {
          name: string;
          process: Buffer;
          tenantId?: string;
      }

    Returns Promise<DeployResourceResponse<ProcessDeployment>>

    Description

    Deploys a single resources (e.g. process or decision model) to Zeebe.

    Errors: PERMISSION_DENIED:

    • if a deployment to an unauthorized tenant is performed INVALID_ARGUMENT:
    • no resources given.
    • if at least one resource is invalid. A resource is considered invalid if:
      • the content is not deserializable (e.g. detected as BPMN, but it's broken XML)
      • the content is invalid (e.g. an event-based gateway has an outgoing sequence flow to a task)
    • if multi-tenancy is enabled, and:
      • a tenant id is not provided
      • a tenant id with an invalid format is provided
    • if multi-tenancy is disabled and a tenant id is provided

    Example

    import {join} from 'path'
    const zbc = new ZeebeGrpcClient()

    zbc.deployResource({ processFilename: join(process.cwd(), 'bpmn', 'onboarding.bpmn' })
    zbc.deployResource({ decisionFilename: join(process.cwd(), 'dmn', 'approval.dmn')})
  • Parameters

    • resource: {
          decisionFilename: string;
          tenantId?: string;
      } | {
          decision: Buffer;
          name: string;
          tenantId?: string;
      }

    Returns Promise<DeployResourceResponse<DecisionDeployment>>

  • Parameters

    • resource: {
          formFilename: string;
          tenantId?: string;
      } | {
          form: Buffer;
          name: string;
          tenantId?: string;
      }

    Returns Promise<DeployResourceResponse<FormDeployment>>

  • Parameters

    • resources: Resource[]
    • Optional tenantId: string

    Returns Promise<DeployResourceResponse<unknown>>

    Description

    Deploys one or more resources (e.g. processes or decision models) to Zeebe. Note that this is an atomic call, i.e. either all resources are deployed, or none of them are.

    Errors: PERMISSION_DENIED:

    • if a deployment to an unauthorized tenant is performed INVALID_ARGUMENT:
    • no resources given.
    • if at least one resource is invalid. A resource is considered invalid if:
      • the content is not deserializable (e.g. detected as BPMN, but it's broken XML)
      • the content is invalid (e.g. an event-based gateway has an outgoing sequence flow to a task)
    • if multi-tenancy is enabled, and:
      • a tenant id is not provided
      • a tenant id with an invalid format is provided
    • if multi-tenancy is disabled and a tenant id is provided

    Example

    const zbc = new ZeebeGrpcClient()

    const result = await zbc.deployResources([
    {
    processFilename: './src/__tests__/testdata/Client-DeployWorkflow.bpmn',
    },
    {
    decisionFilename: './src/__tests__/testdata/quarantine-duration.dmn',
    },
    {
    form: fs.readFileSync('./src/__tests__/testdata/form_1.form'),
    name: 'form_1.form',
    },
    ])
  • Type Parameters

    • K extends EventKey<{
          close: "close";
          connectionError: "connectionError";
          ready: "ready";
          unknown: "unknown";
      }>

    Parameters

    • eventName: K
    • Optional params: {
          close: "close";
          connectionError: "connectionError";
          ready: "ready";
          unknown: "unknown";
      }[K]

    Returns void

  • Parameters

    Returns Promise<EvaluateDecisionResponse>

    Description

    Evaluates a decision. The decision to evaluate can be specified either by using its unique key (as returned by DeployResource), or using the decision ID. When using the decision ID, the latest deployed version of the decision is used.

    Example

    const zbc = new ZeebeGrpcClient()
    zbc.evaluateDecision({
    decisionId: 'my-decision',
    variables: { season: "Fall" }
    }).then(res => console.log(JSON.stringify(res, null, 2)))
  • If this.retry is set true, the operation will be wrapped in an configurable retry on exceptions of gRPC error code 14 - Transient Network Failure. See: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md If this.retry is false, it will be executed with no retry, and the application should handle the exception.

    Type Parameters

    • T

    Parameters

    • operationName: string
    • operation: (() => Promise<T>)

      A gRPC command operation

        • (): Promise<T>
        • Returns Promise<T>

    • Optional retries: number

    Returns Promise<T>

  • Parameters

    Returns Promise<void>

    Description

    Fail a job. This is useful if you are using the decoupled completion pattern or building your own worker. For the retry count, the current count is available in the job metadata.

    Example

    const zbc = new ZeebeGrpcClient()
    zbc.failJob( {
    jobKey: '345424343451',
    retries: 3,
    errorMessage: 'Could not get a response from the order invoicing API',
    retryBackOff: 30 * 1000 // optional, otherwise available for reactivation immediately
    })
  • Parameters

    • files: string | string[]

    Returns Promise<string[]>

    Description

    Return an array of task types contained in a BPMN file or array of BPMN files. This can be useful, for example, to do

    Example

    const zbc = new ZeebeGrpcClient()
    zbc.getServiceTypesFromBpmn(['bpmn/onboarding.bpmn', 'bpmn/process-sale.bpmn'])
    .then(tasktypes => console.log('The task types are:', tasktypes))
  • Parameters

    Returns Promise<ModifyProcessInstanceResponse>

    Description

    Modify a running process instance. This allows you to move the execution tokens, and change the variables. Added in 8.1. See the gRPC protocol documentation.

    Example

    zbc.createProcessInstance('SkipFirstTask', {}).then(res =>
    zbc.modifyProcessInstance({
    processInstanceKey: res.processInstanceKey,
    activateInstructions: [{
    elementId: 'second_service_task',
    ancestorElementInstanceKey: "-1",
    variableInstructions: [{
    scopeId: '',
    variables: { second: 1}
    }]
    }]
    })
    )
  • Type Parameters

    Parameters

    Returns Promise<PublishMessageResponse>

    Description

    Publish a message to the broker for correlation with a workflow message start event. For a message targeting a start event, the correlation key is not needed to target a specific running process instance. However, the hash of the correlationKey is used to determine the partition where this workflow will start. So we assign a random uuid to balance workflow instances created via start message across partitions.

    We make the correlationKey optional, because the caller can specify a correlationKey + messageId to guarantee an idempotent message.

    Multiple messages with the same correlationKey + messageId combination will only start a workflow once. See: https://github.com/zeebe-io/zeebe/issues/1012 and https://github.com/zeebe-io/zeebe/issues/1022

    Example

    const zbc = new ZeebeGrpcClient()
    zbc.publishStartMessage({
    name: 'Start_New_Onboarding_Flow',
    variables: {
    customerId: 'uuid-348-234-8908'
    }
    })

    // To do the same in an idempotent fashion - note: only idempotent during the lifetime of the created instance.
    zbc.publishStartMessage({
    name: 'Start_New_Onboarding_Flow',
    messageId: 'uuid-348-234-8908', // use customerId to make process idempotent per customer
    variables: {
    customerId: 'uuid-348-234-8908'
    }
    })
  • Parameters

    Returns Promise<void>

    Description

    Resolve an incident by incident key.

    Example

    type JSONObject = {[key: string]: string | number | boolean | JSONObject}

    const zbc = new ZeebeGrpcClient()

    async updateAndResolveIncident({
    processInstanceId,
    incidentKey,
    variables
    } : {
    processInstanceId: string,
    incidentKey: string,
    variables: JSONObject
    }) {
    await zbc.setVariables({
    elementInstanceKey: processInstanceId,
    variables
    })
    await zbc.updateRetries()
    zbc.resolveIncident({
    incidentKey
    })
    zbc.resolveIncident(incidentKey)
    }
  • This function takes a gRPC operation that returns a Promise as a function, and invokes it. If the operation throws gRPC error 14, this function will continue to try it until it succeeds or retries are exhausted.

    Type Parameters

    • T

    Parameters

    • operation: {
          operation: (() => Promise<T>);
          operationName: string;
          retries?: number;
      }

      A gRPC command operation that may fail if the broker is not available

      • operation: (() => Promise<T>)
          • (): Promise<T>
          • Returns Promise<T>

      • operationName: string
      • Optional retries?: number

    Returns Promise<T>

  • Type Parameters

    Parameters

    Returns Promise<void>

    Description

    Directly modify the variables is a process instance. This can be used with resolveIncident to update the process and resolve an incident.

    Example

    type JSONObject = {[key: string]: string | number | boolean | JSONObject}

    const zbc = new ZeebeGrpcClient()

    async function updateAndResolveIncident({
    incidentKey,
    processInstanceKey,
    jobKey,
    variableUpdate
    } : {
    incidentKey: string
    processInstanceKey: string
    jobKey: string
    variableUpdate: JSONObject
    }) {
    await zbc.setVariables({
    elementInstanceKey: processInstanceKey,
    variables: variableUpdate
    })
    await zbc.updateJobRetries({
    jobKey,
    retries: 1
    })
    return zbc.resolveIncident({
    incidentKey
    })
    }
  • Type Parameters

    Parameters

    Returns Promise<{
        close: (() => void);
    }>

    Description

    Create a worker that uses the StreamActivatedJobs RPC to activate jobs. NOTE: This will only stream jobs created after the worker is started. To activate existing jobs, use activateJobs or createWorker.

    Example

    const zbc = new ZB.ZeebeGrpcClient()

    const zbStreamWorker = zbc.streamJobs({
    type: 'demo-service',
    worker: 'my-worker-uuid',
    taskHandler: myTaskHandler,
    timeout: 30000 // 30 seconds
    })

    ....
    // Close the worker stream when done
    zbStreamWorker.close()

    // A job handler must return one of job.complete, job.fail, job.error, or job.forward
    // Note: unhandled exceptions in the job handler cause the library to call job.fail
    async function myTaskHandler(job) {
    zbWorker.log('Task variables', job.variables)

    // Task worker business logic goes here
    const updateToBrokerVariables = {
    updatedProperty: 'newValue',
    }

    const res = await callExternalSystem(job.variables)

    if (res.code === 'SUCCESS') {
    return job.complete({
    ...updateToBrokerVariables,
    ...res.values
    })
    }
    if (res.code === 'BUSINESS_ERROR') {
    return job.error({
    code: res.errorCode,
    message: res.message
    })
    }
    if (res.code === 'ERROR') {
    return job.fail({
    errorMessage: res.message,
    retryBackOff: 2000
    })
    }
    }
  • Parameters

    Returns Promise<void>

    Description

    Fail a job by throwing a business error (i.e. non-technical) that occurs while processing a job. The error is handled in the workflow by an error catch event. If there is no error catch event with the specified errorCode then an incident will be raised instead. This method is useful when building a worker, for example for the decoupled completion pattern.

    Example

    type JSONObject = {[key: string]: string | number | boolean | JSONObject}

    interface errorResult {
    resultType: 'ERROR' as 'ERROR'
    errorCode: string
    errorMessage: string
    }

    interface successResult {
    resultType: 'SUCCESS' as 'SUCCESS'
    variableUpdate: JSONObject
    }

    type Result = errorResult | successResult

    const zbc = new ZeebeGrpcClient()


    // This could be a listener on a return queue from an external system
    async function handleJob(jobKey: string, result: Result) {
    if (resultType === 'ERROR') {
    const { errorMessage, errorCode } = result
    zbc.throwError({
    jobKey,
    errorCode,
    errorMessage
    })
    } else {
    zbc.completeJob({
    jobKey,
    variables: result.variableUpdate
    })
    }
    }
  • Parameters

    Returns Promise<void>

    Description

    Update the number of retries for a Job. This is useful if a job has zero remaining retries and fails, raising an incident.

    Example

    type JSONObject = {[key: string]: string | number | boolean | JSONObject}

    const zbc = new ZeebeGrpcClient()

    async function updateAndResolveIncident({
    incidentKey,
    processInstanceKey,
    jobKey,
    variableUpdate
    } : {
    incidentKey: string
    processInstanceKey: string
    jobKey: string
    variableUpdate: JSONObject
    }) {
    await zbc.setVariables({
    elementInstanceKey: processInstanceKey,
    variables: variableUpdate
    })
    await zbc.updateJobRetries({
    jobKey,
    retries: 1
    })
    return zbc.resolveIncident({
    incidentKey
    })
    }
  • Updates the deadline of a job using the timeout (in ms) provided. This can be used for extending or shortening the job deadline.

    Errors: NOT_FOUND: - no job exists with the given key

    INVALID_STATE: - no deadline exists for the given job key

    Parameters

    Returns Promise<void>

Generated using TypeDoc