Информация для разработчиков⚓︎
Классификация процессов⚓︎
В системе доступно два вида процессов: обычные (вечные) и батчевые.
Обычные (вечные) процессы⚓︎
Поднимаются в единственном экземпляре и работают непрерывно. Обычные процессы могут быть подключены к HTTP-шлюзам для обеспечения внешних взаимодействий. Очереди сообщений в таких процессах сохраняются при выключении / падении потока.
Батчевые процессы⚓︎
Поднимаются по шаблону, указанному в манифесте батчевого процесса.
Манифест позволяет указать количество итераций выполнения потока (процесса) и уровень параллелизма. Батчевые процессы позволяют добится конкурентного выполнения задач, задавая количество экземпляров процесса (N потоков) и количество итераций, которые каждый экземпляр должен выполнить (M). Таким образом, достигается M*N запусков задачи, с одним ограничением: все потоки выполняют одну и ту же работу.
По завершении каждого дочернего процесса, связанные с ним очереди сообщений в брокере очищаются, если это предусмотрено в процессе.
Регламенты запуска батчевых процессов⚓︎
Регламенты определяют периодичность запуска батчевых процессов. Если к моменту очередного запуска предыдущий еще не закончен, система применяет одну из трех стратегий:
Текущий процесс останавливается и запускается новый;
Новый запуск игнорируется, при этом последующие запуски произойдут согласно регламенту;
Новый процесс запускается, несмотря на то, что предыдущий еще выполняется.
Разработка модуля⚓︎
Пользовательская часть кода модуля должна отвечать нескольким требованиям:
Это должен быть HTTP-сервис, слушающий порт 8000
и принимающий json-задачи на эндпоинт /work
с помощью метода POST
.
Модуль должен быть упакован в контейнер.
Настоятельно рекомендуется не хранить никакое состояние в оперативной памяти, поскольку система не гарантирует непрерывную работу контейнеров.
Как работает⚓︎
Сайдкар получает задачи из сконфигурированной очереди, затем сайдкар обращается к DWH и получает объект из него с учетом проекции (при необходимости). Полученные данные преобразуются с помощью входного маппинга, после чего отправляются в модуль по HTTP
на 8000
порт по пути /work
. Получив ответ, применяет выходной маппинг и, при необходимости, сохраняет документы в DWH. Далее, сайдкар инициирует обработку ранафтеров, последовательно применяя JsonLogic-условия каждого ранафтера к результату выходного маппинга. Если условие выполняется, применяется соответствующий трансформационный маппинг, и данные передаются на вход следующему модулю в цепочке обработки.
Чем должен обладать код модуля, чтобы работать во FlowMaster⚓︎
Для корректной работы модуля во FlowMaster, он должен обладать следующими требованиями:
- Быть HTTP-сервисом;
- Слушать 8000 порт;
- Иметь эндпоинт POST /work
;
- С эндпоинта POST /work
возвращать объект следующей структуры (опционально):
{
"status": "SUCCESS",
"deliverAfter": "",
"deliverAt": "",
"data": "<что-то, что отдал обработчик>"
}
где
-
status
- статус задачи. Может быть один из SUCCESS
(задача выполнилась успешно), DROP
(задачу необходимо удалить. Ранафтеры не выполняются), END
(задача выполнилась успешно, но ранафтеры выполнять не надо), RETRY
(задача выполнилась с ошибкой, и ее необходимо вернуть в очередь).-
deliverAfter
и deliverAt
- параметры для отложенного выполнения задачи. deliverAfter
- время, через которое задачу нужно передать.deliverAt
- конкретное время начала выполнения задачи. Поддержка и формат параметров зависят от брокера сообщений.
-
data
- JSON-значение. Массив трактуется как несколько отдельных задач, передающихся дальше по отдельности. Элементы массива data
обрабатываются всеми следующими этапами обработки ОТДЕЛЬНО друг от друга.Т.е., если в массиве
data
лежит два JSON-значения {"value": "important data"}
и {"value": "more important data"}
, то следующие шаги (выходной маппинг (см. пункт маппинги), сохранение, применение условий, трансформационный маппинг, передача по следующим очередям) будут выполнены ПОСЛЕДОВАТЕЛЬНО с каждым из этих JSON-значений.
Маппинги⚓︎
Маппинги - используемые во Flowmaster инструменты для трансформации данных на связях между модулями и между модулем и входными шлюзами. Маппинги используют предметно-ориентированный язык обработки данных jq
. Документацию по jq
можно найти тут: ссылка.
Переменные в маппингах JQ⚓︎
В маппингах jq
можно использовать специальные переменные:
- $fromconfig
: объект со всеми настройками модуля.
- $fromdb
: полученный из DWH объект в случае, если получение из DWH было задействовано. В ином случае ничего.
- $task_id
: UUID присвоенный задаче. Используется, когда надо отследить выполнение задачи в процессе.
- $module_token
: токен из Keycloak, авторизованный под соответствующим сервис-аккаунтом.
После того, как модуль будет добавлен в систему через интерфейс добавления, он станет доступен для использования в любом процессе..
Связи модулей⚓︎
Концепция⚓︎
Модули можно связывать. Таким образом один модуль будет передавать задачи в следующий модуль.
Поля (в сущности процесса в kubernetes)⚓︎
type⚓︎
Тип связи. Возможно четыре значения: regular
для обычных связей,externalCall
для вызова подпроцесса,return
для возврата из подпроцесса в вызвавший процесс,httpEndpointCall
для якорей.
deliverAt⚓︎
Параметр deliverAt
позволяет отложить выполнение задачи на ребре, указав точное время доставки (доступен только при поддержке брокером сообщений данного функционала). Значение deliverAt
, заданное на уровне ребра, имеет более высокий приоритет, чем значение, возвращаемое модулем. Формат зависит от брокера (тип regular
, return
и externalCall
)
deliverAfter⚓︎
Параметр deliverAfter
позволяет отложить выполнение задачи на ребре, указав временной интервал задержки, например, "10s" - 10 секунд (доступен только при поддержке брокером сообщений данного функционала). Значение deliverAfter
, заданное на уровне ребра, имеет более высокий приоритет, чем значение, возвращаемое модулем.
Формат зависит от брокера (тип regular
, return
и externalCall
)
module⚓︎
Модуль выбранного процесса, в который нужно отправить задачу (тип regular
)
topic⚓︎
Параметр topic
позволяет направить поток задач не в модуль текущего процесса (типы regular
и externalCall
).
broker⚓︎
Используемый брокер сообщений из массива brokers
, объявленного в рамках этого процесса. Используется при передаче задач в топик (типы regular
, return
и externalCall
)
condition⚓︎
JSON-строка с JSONLogic-условием на связь (все типы)
mapping⚓︎
Параметр mapping
позволяет изменять задачу непосредственно перед ее отправкой, применяя JQ-выражение (см. пункт маппинги). Структура с тремя полями, маппинг хранится в поле mapping
(все типы)
httpEndpoint⚓︎
URL, который используется для передачи задачи по HTTP вместо брокера сообщений. Настройки маппинга и JSONLogiс также учитываются (тип httpEndpointCall
)
httpEndpointMethod⚓︎
Метод для использования при передаче по HTTP (тип httpEndpointCall
)
externalWorkflowName⚓︎
Имя вызываемого подпроцесса (тип externalCall
)
externalWorkflowStartModule⚓︎
Имя модуля вызываемого подпроцесса (тип externalCall
)
externalWorkflowLocalReturnModules⚓︎
Модули вызывающего процесса, в которые нужно вернуть результат при возврате из подпроцесса (тип externalCall
)
Несколько примеров⚓︎
передача задачи в модуль rabota-2
этого же процесса с маппингом и условием:
- condition: `{"==":[{"var":"status"}, "ok"]}`
mapping:
keyField: ""
mapping: "{value: .response.value}"
schemaName: ""
module: rabota-2
- передача по HTTP:
- condition: '{"==":[{"var":"value"}, "some value"]}'
httpEndpoint: http://example.com/api/v1/log
httpEndpointMethod: POST
mapping:
keyField: ""
mapping: .monitoring_value
schemaName: ""
JSON-схемы⚓︎
Во многих компонентах системы используются JSON-схемы. JSON-схемы - это формат описания структуры данных согласно спецификации JSON-схем седьмой редакции.
(спецификация). JSON-схемы хранятся в schema-registry
.
API-шлюзы⚓︎
Концепция⚓︎
API-шлюзы выступают в роли входных точек для интеграции внешних систем с потоковыми (вечными) процессами. Они позволяют принимать задачи по HTTP и направлять их в соответствующие модули процесса.
API-шлюз может направлять задачи в один или одновременно в несколько модулей процесса, фильтровать задачи согласно заданным условиям, а также применять маппинги к поступающим данным (см. пункт маппинги).
API-шлюз можно представить как специализированный модуль, получающий данные по HTTP, а не из очереди задач. Вместо собственной обработки данных, он сразу перенаправляет их в другие модули, основываясь на настройках маршрутизации, фильтрации и маппинга.
Как работает⚓︎
- Зайти на страницу какого-либо процесса.
- Добавить начальный узел
Старт
(вкладкаУтилиты
рядом сМодули
)
- В начальном узле доступны три параметра конфигурации:
Хост - казывает хост, на котором будет доступен API-шлюз.
Схема - пределяет структуру данных, отправляемых на шлюз. В это поле необходимо вписать URL форматаhttp://schema-registry.example.com/api/v1/schemas/{SchemaName}
, по которому доступна JSON-схема данных (см. JSON-схемы). Схема используется для генерации Swagger и валидации входных данных.
Избегайте использования слишком больших схем (десятки тысяч строк с множеством ссылок), так как их обработка может занять значительное время.
Путь - задает путь, по которому шлюз будет принимать запросы.
Важно! Сервер, предоставляющий схему, должен корректно обрабатывать заголовок Accept: application/yaml и возвращать данные в формате YAML. - Все параметры начального узла (хост, схема, путь) являются необязательными. Если не указать хост, FlowMaster использует хост по умолчанию, который задается в параметрах process-manager при развертывании. При отсутствии URL схемы валидация входных данных не производится. Если не указать путь, он генерируется автоматически на основе идентификатора узла и идентификатора процесса.
- При отсутствии значения для хоста, система автоматически определит и подставит его после сохранения. Обновленная информация будет доступна при следующем открытии страницы.
- Готово. Документация Swagger для API-шлюза доступна по адресу: {host}/api/docs/. Поскольку один и тот же шлюз может использоваться несколькими процессами, документация для каждого процесса располагается на отдельной вкладке в интерфейсе Swagger.
Сайдкар FlowMaster⚓︎
Концепция⚓︎
Каждый модуль в системе запускается вместе с вспомогательным компонентом - сайдкаром (apr-runtime). Сайдкар берет на себя взаимодействие с очередями, хранилищем данных DWH и выполняет ряд дополнительных функций.
Сайдкар получает задачу из назначенной очереди и извлекает дополнительные данные из DWH с учетом проекции (если нужно). Затем, сайдкар применяет входной маппинг (см. пункт маппинги) к полученным данным и отправляет их по HTTP на порт 8000 по адресу /work
в модуль. Получив ответ от модуля, сайдкар применяет выходной маппинг, а при необходимости сохраняет обработанные данные в DWH. Далее для каждого ранафтера применяет его JSONLogiс условие на результат после выходного маппинга, и если оно выполняется, применяет трансформационный маппинг и передает следующему модулю.
Схeма работы⚓︎
Переменные окружения (для каждого модуля)⚓︎
flowmaster_workflow_id
: id процесса, в котором поднят модуль.- Настройки процесса. Настройки процесса передаются в том числе в переменных окружения.
Переменные в маппингах JQ⚓︎
В маппингах JQ можно использовать специальные переменные:
- $fromconfig
: объект со всеми настройками модуля.
- $fromdb
: полученный из DWH объект (в случае, если получение из DWH было задействовано, ином случае ничего).
- $task_id
: UUID присвоенный задаче. Используется, когда надо отследить выполнение таски в процессе.
- $module_token
: токен из keycloak, авторизованный под соответствующим сервис аккаунтом.
Функции в маппингах JQ⚓︎
get_service_account_token(client_id; client_secret)
- получает токен из-под предоставленного клиента. Важно: аргументы передаются через точку с запятой.
Эндпоинты АПР⚓︎
АПР функционирует также как сервис, доступный на локальном хосте по порту 40000.
Авторизация⚓︎
- GET
/api/v1/get-service-account-token
- возвращает сервис-аккаунт токен модуля. - GET
/api/v1/get-token/{clientId}/{clientSecret}
- возвращает токен, выданный на основе предоставленных учетных данных.
Токены получаются только тогда, когда нужны; получение токена несколько раз сразу не приведет к новым вызовам keycloak, а вернет одинаковый токен.
Очереди⚓︎
- PUT
/api/v1/brokers/{broker_name}/{topic_name}
- отправляет задачу в указанный топик брокера. Вы можете использоватьbroker_name
со значениемstdrmq
для брокераRabbitMQ
и stdpsr дляPulsar
. На данный момент настройка брокеров из интерфейса платформы недоступна. - GET
/api/v2/brokers/{broker_name}/{topic_name}/tasks-remaining
- возвращает количество задач, оставшееся в топике.
Флаги и настройки модуля (в манифесте)⚓︎
useTaskBatchMode
– отправляет входной массив данных в модуль полностью, а не по одной сущности.useResponseBatchMode
– трактует полученный в качестве ответа массив не как массив отдельных задач, а как одну batch-задачу.useGetFromDwh
– в случае, если необходимо использовать данные из DWH на входном порту, вам потребуется указать JSON-схему, соответствующую структуре данных в DWH (см. JSON-схемы, а также URL-адрес для подключения к DWH.useSaveToDwh
– в случае, если необходимо сохранить ответ от сервиса в DWH, вам потребуется указать JSON-схему для сохраняемого ответа (см. JSON-схемы, а также URL-адрес для подключения к DWH.
Пробы FlowMaster⚓︎
Концепция⚓︎
Перед запуском процесса обработки данных можно настроить автоматическую проверку доступности внешних сервисов по протоколам HTTP или TCP. Эта проверка называется пробой.
Общие поля (в манифесте процесса)⚓︎
periodSeconds⚓︎
Периодичность пробы в секундах (дефолт - 1 проба в секунду).
successThreshold⚓︎
Количество успешных проб подряд, необходимых для заключения об успешности пробы.
initialDelaySeconds⚓︎
Сколько ждать перед тем, как начать выполнять пробы.
Поля TCP-проб (в манифесте процесса)⚓︎
host⚓︎
Хост TCP-пробы.
port⚓︎
Порт TCP-пробы.
Поле НТТР-проб (в манифесте процесса)⚓︎
httpEndpoint⚓︎
HTTP-эндпоинт для выполнения запроса.
Пример(в манифесте модуля/в манифесте процесса в модуле)⚓︎
HTTPRuntimeProbes:
- httpEndpoint: http://192.168.1.2:1111/api/docs/
TCPRuntimeProbes:
- host: 192.168.1.2
port: 1111
initialDelaySeconds: 1
periodSeconds: 1
successThreshold: 1
Подпроцессы⚓︎
Концепция⚓︎
Процессы могут запускать друг друга для выполнения подзадач. Допустим, что есть процессы А, Б, Ц и Д. Процесс А может вызвать процесс Б, который, в свою очередь, вызовет процесс Ц, а тот, в свою очередь, - процесс Д. Важно, что результат каждого вызова может быть передан обратно по цепочке. Соответственно, рекурсия в таком случае тоже возможна.
Настройка вызова подпроцесса в манифесте⚓︎
- Откройте манифест процесса, который будет вызывать подпроцесс.
- Найдите модуль, который будет ответственным за запуск подпроцесса.
- В ранафтере этого модуля добавьте следующие поля:
type: externalCall externalWorkflowName: subproc externalWorkflowStartModule: idx-5 externalWorkflowLocalReturnModules: - subprocreturn-6
где: type: externalCall
отвечает за, собственно, вызов иного процесса.externalWorkflowName: subproc
- указывает на имя вызываемого процесса.externalWorkflowStartModule: idx-5
- указывает на модуль вызываемого процесса, в который последует задача.externalWorkflowLocalReturnModules
- указывает на список модулей ВЫЗЫВАЮЩЕГО процесса, в которые задача должна вернуться после выполнения.- Если возврат из вызываемого процесса необходим, требуется указать в вызываемом процессе, какой модуль(-и) будет(-ут) использоваться для возврата. Для этого такому модулю требуется указать такой ранафтер:
где: type: return
- отвечает за, собственно, возврат.brokerToReturnTo: stdrmq-nonlocal
- указывает на брокер сообщений, который будет использоваться при возврате. У модулей, в которые будет производиться возврат, появится подписка на соответствующий топик в этом брокере.
Nota bene⚓︎
- Процесс, вызываемый другими подпроцессами, не только не может быть удален, но и не может быть изменен в части, используемой другими процессами.