Skip to main content
Version: Current

Stored data

Especially with larger data collections, it is advisable to use data storage and not send data through topology queues. We will demonstrate the procedure for working with a data collection in data storage by modifying the connector to download the organization's repositories from GitHub. We prepared the connector as part of the previous data pagination tutorial. So we recommend going through that tutorial first.

Connector modification for data storage usage

For our example, we used the GitHubRepositoriesBatch connector and renamed it to GitHubStoreRepositoriesBatch.

We first pass a DataStorageManager to the connector, which manages the data storage work. Then we insert the data into the collection using the store method. When storing, it is important to label the collection so that we can correctly identify it in the next steps of the process.

The first parameter is the collection ID. It is the only mandatory parameter for identifying a collection. We have used the process ID, but any string can be used. When the iterations are finished, we send this ID in a message to the followers in the topology. The second parameter of the store method is the data itself.

tip

The store method allows 2 more parameters to be used to define the collection more precisely (see Data Storage documentation). These are particularly useful when using extensions for multitenant environments, e.g. for SaaS integrations where applications can be used and installed by customers with their own authentication.

import DataStorageManager from '@orchesty/nodejs-sdk/dist/lib/Storage/DataStore/DataStorageManager';
import { PROCESS_ID } from '@orchesty/nodejs-sdk/dist/lib/Utils/Headers';
// ...

export default class GitHubStoreRepositoriesBatch extends ABatchNode {

public constructor(private readonly dataStorageManager: DataStorageManager) {
super();
}

// ...

public async processAction(dto: BatchProcessDto<IInput>): Promise<BatchProcessDto> {

// ...

const resp = await this.getSender().send<IResponse>(request, [200]);
const response = resp.getJsonBody();

const processId = dto.getHeader(PROCESS_ID) ?? '';
await this.dataStorageManager.store(
processId,
response
);

// ...

}

}

type IResponse = IRepository[];

interface IRepository {
repository: string;
}

// ...

Pagination and sending of the control message

In the next step, we set up pagination, or after the last iteration, we send a message with the ID of our collection.

// ...

export const NAME = 'git-hub-store-repositories-batch';
const PAGE_ITEMS = 5;

export default class GitHubStoreRepositoriesBatch extends ABatchNode {

// ...

public async processAction(dto: BatchProcessDto<IInput>): Promise<BatchProcessDto> {

// ...

if (response.length >= PAGE_ITEMS) {
dto.setBatchCursor((Number(page) + 1).toString(), true);
} else {
dto.addItem({ processId });
}

// ...
}

}

We can see the full connector code here:

import ABatchNode from '@orchesty/nodejs-sdk/dist/lib/Batch/ABatchNode';
import DataStorageManager from '@orchesty/nodejs-sdk/dist/lib/Storage/DataStore/DataStorageManager';
import { HttpMethods } from '@orchesty/nodejs-sdk/dist/lib/Transport/HttpMethods';
import BatchProcessDto from '@orchesty/nodejs-sdk/dist/lib/Utils/BatchProcessDto';
import { PROCESS_ID } from '@orchesty/nodejs-sdk/dist/lib/Utils/Headers';

export const NAME = 'git-hub-store-repositories-batch';
const PAGE_ITEMS = 5;

export default class GitHubStoreRepositoriesBatch extends ABatchNode {

public constructor(private readonly dataStorageManager: DataStorageManager) {
super();
}

public getName(): string {
return NAME;
}

public async processAction(dto: BatchProcessDto<IInput>): Promise<BatchProcessDto> {
const page = dto.getBatchCursor('1');
const { org } = dto.getJsonData();
const appInstall = await this.getApplicationInstallFromProcess(dto);
const req = await this.getApplication().getRequestDto(
dto,
appInstall,
HttpMethods.GET,
`/orgs/${org}/repos?per_page=${PAGE_ITEMS}&page=${page}`,
);
const resp = await this.getSender().send<IResponse>(req, [200]);
const response = resp.getJsonBody();

const processId = dto.getHeader(PROCESS_ID) ?? '';
await this.dataStorageManager.store(processId, response);

if (response.length >= PAGE_ITEMS) {
dto.setBatchCursor((Number(page) + 1).toString(), true);
} else {
dto.addItem({ processId });
}

return dto;
}

}

type IResponse = IRepository[];

interface IRepository {
repository: string;
}

export interface IInput {
org: string;
}

Connector registration

Register the connector into the container in index.ts.

// ...
import { container } from '@orchesty/nodejs-sdk';
import CurlSender from '@orchesty/nodejs-sdk/dist/lib/Transport/Curl/CurlSender';
import DbClient from '@orchesty/nodejs-sdk/dist/lib/Storage/Database/Client';
import DataStorageManager from '@orchesty/nodejs-sdk/dist/lib/Storage/DataStore/DataStorageManager';
import FileSystemClient from '@orchesty/nodejs-sdk/dist/lib/Storage/File/FileSystem';
import GitHubStoreRepositoriesBatch from './GitHubStoreRepositoriesBatch';
import GitHubApplication from './GitHubApplication';
// ...

export default function prepare(): void {

// ...
const fileSystemClient = new FileSystemClient();
const curlSender = container.get(CurlSender);
const databaseClient = container.get(DbClient);

const gitHubApplication = new GitHubApplication();

const dataStorageManager = new DataStorageManager(fileSystemClient);
container.set(dataStorageManager);

const gitHubStoreRepositoriesBatch = new GitHubStoreRepositoriesBatch(dataStorageManager)
.setSender(curlSender)
.setDb(databaseClient)
.setApplication(gitHubApplication);
container.setBatch(gitHubStoreRepositoriesBatch);
// ...
}

Thus we are ready to get the data collection and store it in the data storage.

Getting data from data storage

To get the data from the stored collection, we create a simple custom node. We name the class e.g. LoadRepositories. Using this node, we will send the retrieved data to a follower so that we can view it in a user task that we include at the end of the topology. The code will look like this:

import ACommonNode from '@orchesty/nodejs-sdk/dist/lib/Commons/ACommonNode';
import ProcessDto from '@orchesty/nodejs-sdk/dist/lib/Utils/ProcessDto';
import DataStorageManager from "@orchesty/nodejs-sdk/dist/lib/Storage/DataStore/DataStorageManager";

export const NAME = 'load-repositories';

export default class LoadRepositories extends ACommonNode {

public constructor(private readonly dataStorageManager: DataStorageManager) {
super();
}

public getName(): string {
return NAME;
}

public async processAction(dto: ProcessDto<IInput>): Promise<ProcessDto> {
const { collection } = dto.getJsonData();
const repos = await this.dataStorageManager.load(collection);
return dto.setJsonData(repos);
}

}

export interface IInput {
collection: string;
}

Node registration

Register the node into the container in index.ts.

// ...
import { container } from '@orchesty/nodejs-sdk';
import DataStorageManager from '@orchesty/nodejs-sdk/dist/lib/Storage/DataStore/DataStorageManager';
import FileSystemClient from '@orchesty/nodejs-sdk/dist/lib/Storage/File/FileSystem';
import LoadRepositories from "./LoadRepositories";
// ...

export default function prepare(): void {

// ...
const fileSystemClient = new FileSystemClient();

const dataStorageManager = new DataStorageManager(fileSystemClient);
container.set(dataStorageManager);

container.setCustomNode(new LoadRepositories(dataStorageManager));
// ...
}

Test

Now we can build a topology to verify that our nodes are working correctly. We again include a user task after each node.

Data store topology

At startup, we need to pass the name of the organization to the connector.

Topology run

Now we can check in the user tasks if the data runs through the topology as expected. While in the user node we should only see the ID of our collection, in user2 we will see the complete data we downloaded from GitHub.