Skip to content

Информация для разработчиков⚓︎

Классификация процессов⚓︎

В системе доступно два вида процессов: обычные (вечные) и батчевые.

Обычные (вечные) процессы⚓︎

Поднимаются в единственном экземпляре и работают непрерывно. Обычные процессы могут быть подключены к HTTP-шлюзам для обеспечения внешних взаимодействий. Очереди сообщений в таких процессах сохраняются при выключении / падении потока.

Батчевые процессы⚓︎

Поднимаются по шаблону, указанному в манифесте батчевого процесса.
Манифест позволяет указать количество итераций выполнения потока (процесса) и уровень параллелизма. Батчевые процессы позволяют добится конкурентного выполнения задач, задавая количество экземпляров процесса (N потоков) и количество итераций, которые каждый экземпляр должен выполнить (M). Таким образом, достигается M*N запусков задачи, с одним ограничением: все потоки выполняют одну и ту же работу.
По завершении каждого дочернего процесса, связанные с ним очереди сообщений в брокере очищаются, если это предусмотрено в процессе.

Регламенты запуска батчевых процессов⚓︎

Регламенты определяют периодичность запуска батчевых процессов. Если к моменту очередного запуска предыдущий еще не закончен, система применяет одну из трех стратегий:
Текущий процесс останавливается и запускается новый;
Новый запуск игнорируется, при этом последующие запуски произойдут согласно регламенту;
Новый процесс запускается, несмотря на то, что предыдущий еще выполняется.

Разработка модуля⚓︎

Пользовательская часть кода модуля должна отвечать нескольким требованиям:
Это должен быть HTTP-сервис, слушающий порт 8000 и принимающий json-задачи на эндпоинт /work с помощью метода POST.
Модуль должен быть упакован в контейнер.
Настоятельно рекомендуется не хранить никакое состояние в оперативной памяти, поскольку система не гарантирует непрерывную работу контейнеров.

Как работает⚓︎

Сайдкар получает задачи из сконфигурированной очереди, затем сайдкар обращается к DWH и получает объект из него с учетом проекции (при необходимости). Полученные данные преобразуются с помощью входного маппинга, после чего отправляются в модуль по HTTP на 8000 порт по пути /work. Получив ответ, применяет выходной маппинг и, при необходимости, сохраняет документы в DWH. Далее, сайдкар инициирует обработку ранафтеров, последовательно применяя JsonLogic-условия каждого ранафтера к результату выходного маппинга. Если условие выполняется, применяется соответствующий трансформационный маппинг, и данные передаются на вход следующему модулю в цепочке обработки.

Чем должен обладать код модуля, чтобы работать во FlowMaster⚓︎

Для корректной работы модуля во FlowMaster, он должен обладать следующими требованиями:
- Быть HTTP-сервисом;
- Слушать 8000 порт;
- Иметь эндпоинт POST /work;
- С эндпоинта POST /work возвращать объект следующей структуры (опционально):

{
   "status": "SUCCESS",
   "deliverAfter": "",
   "deliverAt": "",
   "data": "<что-то, что отдал обработчик>"
}

где
- status - статус задачи. Может быть один из SUCCESS (задача выполнилась успешно), DROP (задачу необходимо удалить. Ранафтеры не выполняются), END (задача выполнилась успешно, но ранафтеры выполнять не надо), RETRY (задача выполнилась с ошибкой, и ее необходимо вернуть в очередь).
- deliverAfter и deliverAt - параметры для отложенного выполнения задачи. deliverAfter - время, через которое задачу нужно передать.
deliverAt - конкретное время начала выполнения задачи.
Поддержка и формат параметров зависят от брокера сообщений.
- data - JSON-значение. Массив трактуется как несколько отдельных задач, передающихся дальше по отдельности. Элементы массива data обрабатываются всеми следующими этапами обработки ОТДЕЛЬНО друг от друга.
Т.е., если в массиве data лежит два JSON-значения {"value": "important data"} и {"value": "more important data"}, то следующие шаги (выходной маппинг (см. пункт маппинги), сохранение, применение условий, трансформационный маппинг, передача по следующим очередям) будут выполнены ПОСЛЕДОВАТЕЛЬНО с каждым из этих JSON-значений.

Маппинги⚓︎

Маппинги - используемые во Flowmaster инструменты для трансформации данных на связях между модулями и между модулем и входными шлюзами. Маппинги используют предметно-ориентированный язык обработки данных jq. Документацию по jq можно найти тут: ссылка.

Переменные в маппингах JQ⚓︎

В маппингах jq можно использовать специальные переменные:
- $fromconfig: объект со всеми настройками модуля.
- $fromdb: полученный из DWH объект в случае, если получение из DWH было задействовано. В ином случае ничего.
- $task_id: UUID присвоенный задаче. Используется, когда надо отследить выполнение задачи в процессе.
- $module_token: токен из Keycloak, авторизованный под соответствующим сервис-аккаунтом.
После того, как модуль будет добавлен в систему через интерфейс добавления, он станет доступен для использования в любом процессе..

Связи модулей⚓︎

Концепция⚓︎

Модули можно связывать. Таким образом один модуль будет передавать задачи в следующий модуль.

Поля (в сущности процесса в kubernetes)⚓︎

type⚓︎

Тип связи. Возможно четыре значения: regular для обычных связей,externalCall для вызова подпроцесса,return для возврата из подпроцесса в вызвавший процесс,httpEndpointCall для якорей.

deliverAt⚓︎

Параметр deliverAt позволяет отложить выполнение задачи на ребре, указав точное время доставки (доступен только при поддержке брокером сообщений данного функционала). Значение deliverAt, заданное на уровне ребра, имеет более высокий приоритет, чем значение, возвращаемое модулем. Формат зависит от брокера (тип regular, return и externalCall)

deliverAfter⚓︎

Параметр deliverAfter позволяет отложить выполнение задачи на ребре, указав временной интервал задержки, например, "10s" - 10 секунд (доступен только при поддержке брокером сообщений данного функционала). Значение deliverAfter, заданное на уровне ребра, имеет более высокий приоритет, чем значение, возвращаемое модулем.
Формат зависит от брокера (тип regular, return и externalCall)

module⚓︎

Модуль выбранного процесса, в который нужно отправить задачу (тип regular)

topic⚓︎

Параметр topic позволяет направить поток задач не в модуль текущего процесса (типы regular и externalCall).

broker⚓︎

Используемый брокер сообщений из массива brokers, объявленного в рамках этого процесса. Используется при передаче задач в топик (типы regular, return и externalCall)

condition⚓︎

JSON-строка с JSONLogic-условием на связь (все типы)

mapping⚓︎

Параметр mapping позволяет изменять задачу непосредственно перед ее отправкой, применяя JQ-выражение (см. пункт маппинги). Структура с тремя полями, маппинг хранится в поле mapping (все типы)

httpEndpoint⚓︎

URL, который используется для передачи задачи по HTTP вместо брокера сообщений. Настройки маппинга и JSONLogiс также учитываются (тип httpEndpointCall)

httpEndpointMethod⚓︎

Метод для использования при передаче по HTTP (тип httpEndpointCall)

externalWorkflowName⚓︎

Имя вызываемого подпроцесса (тип externalCall)

externalWorkflowStartModule⚓︎

Имя модуля вызываемого подпроцесса (тип externalCall)

externalWorkflowLocalReturnModules⚓︎

Модули вызывающего процесса, в которые нужно вернуть результат при возврате из подпроцесса (тип externalCall)

Несколько примеров⚓︎

передача задачи в модуль rabota-2 этого же процесса с маппингом и условием:

- condition: `{"==":[{"var":"status"}, "ok"]}`
  mapping:
    keyField: ""
    mapping: "{value: .response.value}"
    schemaName: ""
  module: rabota-2

- передача по HTTP:
- condition: '{"==":[{"var":"value"}, "some value"]}'
  httpEndpoint: http://example.com/api/v1/log
  httpEndpointMethod: POST
  mapping:
    keyField: ""
    mapping: .monitoring_value
    schemaName: ""

JSON-схемы⚓︎

Во многих компонентах системы используются JSON-схемы. JSON-схемы - это формат описания структуры данных согласно спецификации JSON-схем седьмой редакции.
(спецификация). JSON-схемы хранятся в schema-registry.

API-шлюзы⚓︎

Концепция⚓︎

API-шлюзы выступают в роли входных точек для интеграции внешних систем с потоковыми (вечными) процессами. Они позволяют принимать задачи по HTTP и направлять их в соответствующие модули процесса.
API-шлюз может направлять задачи в один или одновременно в несколько модулей процесса, фильтровать задачи согласно заданным условиям, а также применять маппинги к поступающим данным (см. пункт маппинги).
API-шлюз можно представить как специализированный модуль, получающий данные по HTTP, а не из очереди задач. Вместо собственной обработки данных, он сразу перенаправляет их в другие модули, основываясь на настройках маршрутизации, фильтрации и маппинга.

Как работает⚓︎

  1. Зайти на страницу какого-либо процесса.
  2. Добавить начальный узел Старт (вкладка Утилиты рядом с Модули)
    gateway_doc1.png
  3. В начальном узле доступны три параметра конфигурации:
    Хост - казывает хост, на котором будет доступен API-шлюз.
    Схема - пределяет структуру данных, отправляемых на шлюз. В это поле необходимо вписать URL формата http://schema-registry.example.com/api/v1/schemas/{SchemaName}, по которому доступна JSON-схема данных (см. JSON-схемы). Схема используется для генерации Swagger и валидации входных данных.
    Избегайте использования слишком больших схем (десятки тысяч строк с множеством ссылок), так как их обработка может занять значительное время.
    Путь - задает путь, по которому шлюз будет принимать запросы.
    Важно! Сервер, предоставляющий схему, должен корректно обрабатывать заголовок Accept: application/yaml и возвращать данные в формате YAML.
  4. Все параметры начального узла (хост, схема, путь) являются необязательными. Если не указать хост, FlowMaster использует хост по умолчанию, который задается в параметрах process-manager при развертывании. При отсутствии URL схемы валидация входных данных не производится. Если не указать путь, он генерируется автоматически на основе идентификатора узла и идентификатора процесса.
  5. При отсутствии значения для хоста, система автоматически определит и подставит его после сохранения. Обновленная информация будет доступна при следующем открытии страницы.
  6. Готово. Документация Swagger для API-шлюза доступна по адресу: {host}/api/docs/. Поскольку один и тот же шлюз может использоваться несколькими процессами, документация для каждого процесса располагается на отдельной вкладке в интерфейсе Swagger.
    gateway_doc2.png

Сайдкар FlowMaster⚓︎

Концепция⚓︎

Каждый модуль в системе запускается вместе с вспомогательным компонентом - сайдкаром (apr-runtime). Сайдкар берет на себя взаимодействие с очередями, хранилищем данных DWH и выполняет ряд дополнительных функций.
Сайдкар получает задачу из назначенной очереди и извлекает дополнительные данные из DWH с учетом проекции (если нужно). Затем, сайдкар применяет входной маппинг (см. пункт маппинги) к полученным данным и отправляет их по HTTP на порт 8000 по адресу /work в модуль. Получив ответ от модуля, сайдкар применяет выходной маппинг, а при необходимости сохраняет обработанные данные в DWH. Далее для каждого ранафтера применяет его JSONLogiс условие на результат после выходного маппинга, и если оно выполняется, применяет трансформационный маппинг и передает следующему модулю.

Схeма работы⚓︎

sidecar_doc.png

Переменные окружения (для каждого модуля)⚓︎

  • flowmaster_workflow_id: id процесса, в котором поднят модуль.
  • Настройки процесса. Настройки процесса передаются в том числе в переменных окружения.

Переменные в маппингах JQ⚓︎

В маппингах JQ можно использовать специальные переменные:
- $fromconfig: объект со всеми настройками модуля.
- $fromdb: полученный из DWH объект (в случае, если получение из DWH было задействовано, ином случае ничего).
- $task_id: UUID присвоенный задаче. Используется, когда надо отследить выполнение таски в процессе.
- $module_token: токен из keycloak, авторизованный под соответствующим сервис аккаунтом.

Функции в маппингах JQ⚓︎

  • get_service_account_token(client_id; client_secret) - получает токен из-под предоставленного клиента. Важно: аргументы передаются через точку с запятой.

Эндпоинты АПР⚓︎

АПР функционирует также как сервис, доступный на локальном хосте по порту 40000.

Авторизация⚓︎

  • GET /api/v1/get-service-account-token - возвращает сервис-аккаунт токен модуля.
  • GET /api/v1/get-token/{clientId}/{clientSecret} - возвращает токен, выданный на основе предоставленных учетных данных.

Токены получаются только тогда, когда нужны; получение токена несколько раз сразу не приведет к новым вызовам keycloak, а вернет одинаковый токен.

Очереди⚓︎

  • PUT /api/v1/brokers/{broker_name}/{topic_name} - отправляет задачу в указанный топик брокера. Вы можете использовать broker_name со значением stdrmq для брокера RabbitMQ и stdpsr для Pulsar. На данный момент настройка брокеров из интерфейса платформы недоступна.
  • GET /api/v2/brokers/{broker_name}/{topic_name}/tasks-remaining - возвращает количество задач, оставшееся в топике.

Флаги и настройки модуля (в манифесте)⚓︎

  • useTaskBatchMode – отправляет входной массив данных в модуль полностью, а не по одной сущности.
  • useResponseBatchMode – трактует полученный в качестве ответа массив не как массив отдельных задач, а как одну batch-задачу.
  • useGetFromDwh – в случае, если необходимо использовать данные из DWH на входном порту, вам потребуется указать JSON-схему, соответствующую структуре данных в DWH (см. JSON-схемы, а также URL-адрес для подключения к DWH.
  • useSaveToDwh – в случае, если необходимо сохранить ответ от сервиса в DWH, вам потребуется указать JSON-схему для сохраняемого ответа (см. JSON-схемы, а также URL-адрес для подключения к DWH.

Пробы FlowMaster⚓︎

Концепция⚓︎

Перед запуском процесса обработки данных можно настроить автоматическую проверку доступности внешних сервисов по протоколам HTTP или TCP. Эта проверка называется пробой.

Общие поля (в манифесте процесса)⚓︎

periodSeconds⚓︎

Периодичность пробы в секундах (дефолт - 1 проба в секунду).

successThreshold⚓︎

Количество успешных проб подряд, необходимых для заключения об успешности пробы.

initialDelaySeconds⚓︎

Сколько ждать перед тем, как начать выполнять пробы.

Поля TCP-проб (в манифесте процесса)⚓︎

host⚓︎

Хост TCP-пробы.

port⚓︎

Порт TCP-пробы.

Поле НТТР-проб (в манифесте процесса)⚓︎

httpEndpoint⚓︎

HTTP-эндпоинт для выполнения запроса.

Пример(в манифесте модуля/в манифесте процесса в модуле)⚓︎

HTTPRuntimeProbes:
- httpEndpoint: http://192.168.1.2:1111/api/docs/
TCPRuntimeProbes:
- host: 192.168.1.2
  port: 1111
  initialDelaySeconds: 1
  periodSeconds: 1
  successThreshold: 1

Подпроцессы⚓︎

Концепция⚓︎

Процессы могут запускать друг друга для выполнения подзадач. Допустим, что есть процессы А, Б, Ц и Д. Процесс А может вызвать процесс Б, который, в свою очередь, вызовет процесс Ц, а тот, в свою очередь, - процесс Д. Важно, что результат каждого вызова может быть передан обратно по цепочке. Соответственно, рекурсия в таком случае тоже возможна.
subproc1.png

Настройка вызова подпроцесса в манифесте⚓︎

  1. Откройте манифест процесса, который будет вызывать подпроцесс.
  2. Найдите модуль, который будет ответственным за запуск подпроцесса.
  3. В ранафтере этого модуля добавьте следующие поля:
    type: externalCall
    externalWorkflowName: subproc
    externalWorkflowStartModule: idx-5
    externalWorkflowLocalReturnModules:
    - subprocreturn-6
    

    где:
  4. type: externalCall отвечает за, собственно, вызов иного процесса.
  5. externalWorkflowName: subproc - указывает на имя вызываемого процесса.
  6. externalWorkflowStartModule: idx-5 - указывает на модуль вызываемого процесса, в который последует задача.
  7. externalWorkflowLocalReturnModules - указывает на список модулей ВЫЗЫВАЮЩЕГО процесса, в которые задача должна вернуться после выполнения.
  8. Если возврат из вызываемого процесса необходим, требуется указать в вызываемом процессе, какой модуль(-и) будет(-ут) использоваться для возврата. Для этого такому модулю требуется указать такой ранафтер:
    type: return
    brokerToReturnTo: stdrmq-nonlocal
    

    где:
  9. type: return - отвечает за, собственно, возврат.
  10. brokerToReturnTo: stdrmq-nonlocal - указывает на брокер сообщений, который будет использоваться при возврате. У модулей, в которые будет производиться возврат, появится подписка на соответствующий топик в этом брокере.

Nota bene⚓︎

  • Процесс, вызываемый другими подпроцессами, не только не может быть удален, но и не может быть изменен в части, используемой другими процессами.