Pular para o conteúdo principal

Pipelines

Pipelines são o runtime do Toposync para automação em tempo real e processamento de mídia. Um pipeline é um grafo acíclico direcionado, com nome estável, que recebe pacotes de fontes, transforma esses pacotes por operadores e opcionalmente executa efeitos como notificações, armazenamento de imagens, estados do Home Assistant ou publicação de transmissões ao vivo.

Esta página é uma visão para desenvolvedores. Ela explica a arquitetura atual e os contratos que autores de extensões devem respeitar. Ela não é uma referência completa de todos os operadores.

Onde pipelines se encaixam

O backend origin é dono da configuração, compilação, orquestração, status, telemetria e persistência dos pipelines. Operadores são fornecidos pelo core e por extensões instaladas.

Config store
-> Grafo do pipeline
-> Compilador
-> Runtime local
-> Runtime em processing server remoto
-> Eventos projetados de volta ao origin

A aplicação expõe gerenciamento de pipelines pela API do backend e pelo editor no frontend. Desenvolvedores normalmente usam:

  • GET /api/pipelines para pipelines salvos;
  • GET /api/pipelines/operators para o catálogo de operadores;
  • POST /api/pipelines/compile para validação do grafo e diagnósticos;
  • GET /api/pipelines/runtime/status para estado do runtime ativo;
  • GET /api/processing-servers para servidores remotos de processamento.

Para comandos de desenvolvimento local, veja Configuração de desenvolvimento. Para caminhos de instalação com processing servers, veja Escolha sua instalação.

Modelo de pipeline

Um pipeline salvo tem nome, flag enabled, processing_server_id, modo de editor, código Python opcional e um objeto de grafo.

O grafo contém:

  • schema_version: obrigatório, atualmente >= 1;
  • nodes: instâncias de operador com id, operator e config;
  • edges: conexões de uma porta de nó para outra;
  • limits: limites opcionais de runtime, como orçamento de memória para artifacts;
  • layout e meta: campos de editor e metadados que não devem alterar a semântica de execução.

Formato mínimo:

{
"schema_version": 1,
"nodes": [
{
"id": "camera",
"operator": "camera.source",
"config": {
"camera_id": "front"
}
},
{
"id": "motion",
"operator": "camera.motion_gate",
"config": {}
},
{
"id": "notify",
"operator": "core.notify",
"config": {
"title": "Motion detected"
}
}
],
"edges": [
{
"from": { "node": "camera", "port": "out" },
"to": { "node": "motion", "port": "in" },
"maxsize": 1,
"drop_policy": "latest_only"
},
{
"from": { "node": "motion", "port": "out" },
"to": { "node": "notify", "port": "in" }
}
]
}

Nomes de pipeline precisam ser identificadores Python válidos. Isso mantém JSON, DSL Python e referências geradas alinhados.

Operadores

Um operador é a unidade reutilizável de trabalho dentro do grafo. O backend registra operadores em um OperatorRegistry. Operadores do core são registrados na inicialização, e extensões podem adicionar seus próprios operadores durante o setup da extensão.

Uma definição de operador declara:

  • id: identificador estável como camera.source, vision.detect ou stream.publish_video;
  • portas de entrada e saída;
  • modelo Pydantic de configuração e defaults normalizados;
  • capacidades de runtime como source, sink, realtime, heavy_compute ou origin_only;
  • contratos de entrada e saída, incluindo payload keys, artifacts, campos de source, campos de media e modalidades;
  • metadados de UI para agrupamento e ordenação no editor;
  • diagnósticos opcionais para alertar configurações inválidas ou arriscadas antes do runtime.

Use share_strategy="by_signature" para trabalho puro e reutilizável. Use share_strategy="never" para operadores com efeitos colaterais ou estado interno. O registry marca esses operadores automaticamente com capacidades pure ou side_effect.

Contrato de pacote

Operadores trocam objetos Packet. Um packet carrega:

  • packet_id: identidade única do pacote;
  • stream_id: identidade lógica do stream;
  • lifecycle: open, update ou close;
  • payload: dados estruturados compatíveis com JSON;
  • artifacts: objetos binários ou em memória, como imagens;
  • metadata: metadados de runtime que não devem ser tratados como payload de usuário;
  • parent_packet_id: linhagem opcional para pacotes derivados.

O lifecycle é importante para fluxos de evento. Por exemplo, um operador de movimento ou tracking pode abrir um evento, emitir updates enquanto ele permanece ativo e fechar o evento quando a condição termina. Sinks como notificações podem usar esse lifecycle para atualizar ou fechar registros existentes em vez de criar mensagens desconectadas.

Operadores de imagem e vídeo normalmente usam o artifact main para o frame principal. Operadores devem declarar artifacts exigidos e produzidos para que UI, diagnósticos e ferramentas futuras consigam raciocinar sobre compatibilidade do grafo.

Compilação

A compilação transforma o JSON do grafo em um plano de runtime normalizado.

O compilador valida que:

  • o schema do grafo é válido;
  • ids de nós são únicos;
  • cada operator id está registrado;
  • cada config respeita o modelo Pydantic do operador;
  • entradas obrigatórias estão conectadas;
  • portas de origem e destino existem;
  • cada porta de entrada tem no máximo uma aresta upstream;
  • o grafo é DAG, sem ciclos.

A compilação também calcula ordem topológica e assinaturas estáveis para nós. Essas assinaturas permitem compartilhar nós puros equivalentes entre múltiplos pipelines locais quando possível.

O compilador também pode rejeitar combinações inseguras específicas. Por exemplo, um detector que emite eventos finitos de detecção não pode alimentar tracking diretamente; tracking precisa de anotações por frame.

Execução

Em runtime, cada nó roda como uma task assíncrona e se comunica por canais limitados. Arestas podem definir tamanho de fila e política de descarte, o que importa para streams de câmera em tempo real, onde frames antigos costumam ser piores do que frames descartados.

Políticas suportadas:

  • block;
  • drop_updates;
  • drop_oldest;
  • drop_newest;
  • latest_only;
  • keyed_latest_only.

Operadores podem executar no event loop, thread pool, process pool ou runtime externo, conforme seu execution mode. Trabalho pesado de CPU não deve rodar diretamente no event loop.

Artifacts têm orçamento de memória por pacote, por pipeline e globalmente. O runtime padrão preserva o artifact principal quando possível e remove primeiro dados em memória de artifacts derivados quando o orçamento é excedido.

Execução local e distribuída

Todo pipeline tem um processing_server_id.

  • local executa no backend origin.
  • Qualquer outro id aponta para um processing server HTTP registrado.

O origin continua dono da configuração canônica. Quando um pipeline aponta para um processing server remoto, o orquestrador envia o pipeline e um snapshot de settings para esse servidor. Eventos e observabilidade do lado de processamento voltam para o origin.

O plano distribuído usa as capacidades dos operadores:

  • operadores com origin_only ficam no origin;
  • outros operadores podem rodar no processing server;
  • arestas de processing para origin são suportadas via eventos projetados;
  • arestas de origin para processing ainda não são suportadas pelo planner atual.

Esse desenho permite mover trabalho pesado de câmeras e visão para fora do Home Assistant ou de uma Raspberry Pi, mantendo efeitos origin-only, como notificações locais e armazenamento local, sob controle do origin.

Streaming tem uma regra adicional de afinidade: um pipeline que escreve em uma transmissão hospedada com stream.publish_video precisa rodar no mesmo processing_server_id do host_server_id da transmissão. Isso evita publicar frames no engine de streaming errado.

Extensões e ownership de operadores

Extensões devem registrar operadores de domínio em vez de adicionar código específico de domínio ao core.

Exemplos atuais:

  • toposync-ext-cameras registra fontes de câmera, fontes de evento ONVIF, gates de movimento e pós-processamento de câmera;
  • toposync-ext-vision registra operadores de visão orientados a tarefa, como detecção, tracking, segmentação, classificação e recorte de objetos;
  • toposync-ext-home-assistant registra operadores de estado e ação do Home Assistant;
  • toposync-ext-streaming registra operadores de demanda e publicação de vídeo;
  • extensões opcionais podem adicionar seus próprios operadores sem alterar o modelo de grafo do core.

Para empacotamento e carregamento de extensões, veja Autoria de extensões. Para o contrato do host frontend, veja Plugin API.

Observabilidade e armazenamento

O origin registra status de runtime, métricas de nós, telemetria numérica, marcadores de imagem, camadas de armazenamento e snapshots selecionados de passos. O frontend usa isso para status, previews, diagnósticos e troubleshooting.

Áreas importantes da API:

  • /api/pipelines/{pipeline_name}/stats;
  • /api/pipelines/{pipeline_name}/telemetry/numeric;
  • /api/pipelines/{pipeline_name}/telemetry/image-markers;
  • /api/pipelines/{pipeline_name}/storage;
  • /api/pipelines/preview/frame.

Processing servers remotos podem reportar lotes de observabilidade ao origin. Telemetria numérica é aplicada no origin; marcadores de imagem remotos são conservadores porque o origin pode não conseguir ler arquivos que existem apenas no host de processamento.

Regras de design para novos operadores

Novos operadores devem seguir estas regras:

  • Prefira ids orientados a tarefa, como vision.detect, em vez de ids específicos de vendor.
  • Use modelos Pydantic de configuração com validação estrita sempre que possível.
  • Declare entradas, saídas, payload keys exigidas, artifacts exigidos, campos produzidos e modalidades.
  • Marque trabalho com efeito colateral usando share_strategy="never".
  • Marque trabalho preso ao origin com a capacidade origin_only.
  • Coloque trabalho bloqueante atrás do scheduler com execution mode apropriado.
  • Preserve o lifecycle do packet, salvo quando o operador intencionalmente cria um novo stream de eventos.
  • Mantenha payload compatível com JSON e coloque dados grandes em artifacts ou arquivos.
  • Emita diagnósticos para configurações tecnicamente válidas, mas operacionalmente arriscadas.

O que fica para páginas separadas

Esta página é o mapa, não a referência completa. Páginas separadas devem cobrir:

  • catálogo completo de operadores;
  • DSL Python;
  • UX do editor de pipelines;
  • operação de processing servers;
  • padrões de pipeline específicos de streaming;
  • provisionamento de modelos de visão;
  • retenção de telemetria e armazenamento;
  • autoria de extensões com operadores customizados.