Basic connector
Connector's job is mainly to communicate with another services. This tutorial shows how to create a connector fetching data from a REST API. In case of successful request, connector sends data to next node for further processing.
An important responsibility of the connector is to evaluate the response and handle error conditions. Orchesty offers a number of options for setting the behavior of call error conditions. All are described on the Response results page. Here we list the main ones:
- Repeat - to retry the same process again after specified time delay
- Stop - to stop processing with either success of failure state
- Limit - in case of 'Too many requests' return message into Limiter
- Ignore - force process to continue
Prerequisites
Creating Connector
- Typescript
- PHP
First we create a new class, which will for simplicity extends AConnector. This abstract contains prepared method for using CurlSender used for calling REST API.
import AConnector from '@orchesty/nodejs-sdk/dist/lib/Connector/AConnector';
import ProcessDto from '@orchesty/nodejs-sdk/dist/lib/Utils/ProcessDto';
export const NAME = 'jsonplaceholder-get-users';
export default class GetUsersConnector extends AConnector {
public getName(): string {
return NAME;
}
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
return dto;
}
}
First we create a new class, which will for simplicity extends ConnectorAbstract. This abstract contains prepared method for using CurlSender used for calling REST API.
namespace Pipes\PhpSdk\Connector;
use Hanaboso\CommonsBundle\Process\ProcessDto;
use Hanaboso\PipesPhpSdk\Connector\ConnectorAbstract;
final class GetUsersConnector extends ConnectorAbstract
{
public const NAME = 'jsonplaceholder-get-users';
function getName(): string
{
return self::NAME;
}
function processAction(ProcessDto $dto): ProcessDto
{
return $dto;
}
}
It's also a good idea to prefix names by category, which can be for example a name of 3rd party service it's interacting with: jsonplaceholder-get-users.
Next we'll implement a process method calling JsonPlaceholder.
- Typescript
- PHP
import { HttpMethods } from '@orchesty/nodejs-sdk/dist/lib/Transport/HttpMethods';
import RequestDto from '@orchesty/nodejs-sdk/dist/lib/Transport/Curl/RequestDto';
export default class GetUsersConnector extends AConnector {
// ...
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const request = new RequestDto(
'https://jsonplaceholder.typicode.com/users',
HttpMethods.GET,
dto,
);
const response = await this.getSender().send(request);
dto.setData(response.getBody());
return dto;
}
}
namespace Pipes\PhpSdk\Connector;
use GuzzleHttp\Psr7\Uri;
use Hanaboso\CommonsBundle\Transport\Curl\CurlException;
use Hanaboso\CommonsBundle\Transport\Curl\CurlManager;
use Hanaboso\CommonsBundle\Transport\Curl\Dto\RequestDto;
use Hanaboso\PipesPhpSdk\Connector\Exception\ConnectorException;
final class GetUsersConnector extends ConnectorAbstract
{
// ...
function processAction(ProcessDto $dto): ProcessDto
{
$request = new RequestDto(
new Uri('https://jsonplaceholder.typicode.com/users'),
CurlManager::METHOD_GET,
$dto,
);
$response = $this->getSender()->send($request);
$dto->setData($response->getBody());
return $dto;
}
}
Error handling
Last step is error case handling. In this example we are using repeatOnErrorRanges status result config which will handle different status codes. This config will re-try sending request for all status codes which are above 300. You can use predefined config or you can create your own. The example of custom config is displayed in next tutorial.
Another way is via setting headers which will be discussed in later tutorials.
- Typescript
- PHP
// ...
import { repeatOnErrorRanges } from '@orchesty/nodejs-sdk/dist/lib/Transport/Curl/ResultCodeRange';
export default class GetUsersConnector extends AConnector {
// ...
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
// ...
const response = await this.getSender().send(request, repeatOnErrorRanges);
dto.setData(response.getBody());
return dto;
}
}
use Hanaboso\CommonsBundle\Exception\OnRepeatException;
final class GetUsersConnector extends ConnectorAbstract
{
// ...
function processAction(ProcessDto $dto): ProcessDto
{
// ...
$response = $this->getSender()->send($request);
if ($response->getStatusCode() >= 300) {
throw new OnRepeatException($dto, $response->getBody(), $response->getStatusCode());
}
$dto->setData($response->getBody());
return $dto;
}
}
Whole code of connector
- Typescript
- PHP
import AConnector from '@orchesty/nodejs-sdk/dist/lib/Connector/AConnector';
import OnRepeatException from '@orchesty/nodejs-sdk/dist/lib/Exception/OnRepeatException';
import RequestDto from '@orchesty/nodejs-sdk/dist/lib/Transport/Curl/RequestDto';
import { HttpMethods } from '@orchesty/nodejs-sdk/dist/lib/Transport/HttpMethods';
import ProcessDto from '@orchesty/nodejs-sdk/dist/lib/Utils/ProcessDto';
import { repeatOnErrorRanges } from '@orchesty/nodejs-sdk/dist/lib/Transport/Curl/ResultCodeRange';
export const NAME = 'jsonplaceholder-get-users';
export default class GetUsersConnector extends AConnector {
public getName(): string {
return NAME;
}
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const request = new RequestDto(
'https://jsonplaceholder.typicode.com/users',
HttpMethods.GET,
dto,
);
const response = await this.getSender().send(request. repeatOnErrorRanges);
dto.setData(response.getBody());
return dto;
}
}
namespace Pipes\PhpSdk\Connector;
use GuzzleHttp\Psr7\Uri;
use Hanaboso\CommonsBundle\Exception\OnRepeatException;
use Hanaboso\CommonsBundle\Process\ProcessDto;
use Hanaboso\CommonsBundle\Transport\Curl\CurlException;
use Hanaboso\CommonsBundle\Transport\Curl\CurlManager;
use Hanaboso\CommonsBundle\Transport\Curl\Dto\RequestDto;
use Hanaboso\PipesPhpSdk\Connector\ConnectorAbstract;
use Hanaboso\PipesPhpSdk\Connector\Exception\ConnectorException;
final class GetUsersConnector extends ConnectorAbstract
{
public const NAME = 'jsonplaceholder-get-users';
function getName(): string
{
return self::NAME;
}
function processAction(ProcessDto $dto): ProcessDto
{
$request = new RequestDto(
new Uri('https://jsonplaceholder.typicode.com/users'),
CurlManager::METHOD_GET,
$dto,
);
$response = $this->getSender()->send($request);
$dto->setData($response->getBody());
if ($response->getStatusCode() >= 300) {
throw new OnRepeatException($dto, $response->getBody(), $response->getStatusCode());
}
return $dto;
}
}
Registering into SDK container
- Typescript
- PHP
Last step is to register connector into container. This is done in index.ts file located in root of src directory.
// ...
import { container } from '@orchesty/nodejs-sdk';
import CurlSender from '@orchesty/nodejs-sdk/dist/lib/Transport/Curl/CurlSender';
import GetUsersConnector from './GetUsersConnector';
// ...
export default function prepare(): void {
// ...
const curlSender = container.get(CurlSender);
const getUsers = new GetUsersConnector()
.setSender(curlSender);
container.setConnector(getUsers);
// ...
};
Last step is to register connector into container. This is done in connector.yaml file located config directory.
# ./config/connector.yaml
services:
_defaults:
public: '%public.services%'
hbpf.connector.jsonplaceholder-get-users:
class: Pipes\PhpSdk\Connector\GetUsersConnector
arguments:
- '@hbpf.application_install.repository'
calls:
- ['setSender', ['@hbpf.transport.curl_manager']]
Building topology
Now we can create a new topology in Admin to test the new connector. In the topology, we will use the connector and user task to view the downloaded data.
We publish and activate the new topology and run it with empty data. In the User Tasks tab, we should now see a report with the data we got from the remote service.