Vai al contenuto

Gestione Workflow Media nello sviluppo di un Service

Il Sistema Data Analytics mette a disposizione dei Service la possibilità di archiviare, all'interno di apposite aree interne dedicate, i Workflow Media da essi generati.

Cosa sono i Media Workflows?

I Workflow Media sono contenuti multimediali quali:

  • File generici
  • Pagine HTML
  • Log
  • Immagini (es. grafici)

Argomenti per Workflow Media

A ciascun Service, il Sistema Data Analytics passerà sempre ed automaticamente un insieme di argomenti contenenti le coordinate di suddette aree di archiviazione. Tali argomenti potranno essere quindi gestiti dal programma nucleo del Service (vedi Esempio).

Gli argomenti in questione sono i seguenti:

Argomento Valore
--go_manager.base_path percorso della "cartella" all'interno dell'object store dove verranno salvati i file (fisso)
--go_manager.storage_type minio (fisso)
--go_manager.topic notifications (fisso)
--go_manager.brokers strimzi-kafka-brokers.strimzi-kafka.svc.cluster.local:9092 (fisso)
--go_manager.minIO_URL minio.minio:9000 (fisso)
--go_manager.minIO_ACCESS_KEY (redacted) (fisso)
--go_manager.minIO_SECRET_KEY (redacted) (fisso)
--go_manager.minio_bucket alida (fisso)
--go_manager.use_ssl false (fisso)

Procedura salvataggio di un Workflow Media

Se il contenuto multimediale è di tipo Log, occorrerà inviare solo i metadati sul broker Kafka interno del Sistema Data Analytics.

Se il tipo è uno dei rimanenti (File generico, Pagina HTML o Immagine), allora:

  1. Salvare il file su MinIO
  2. Inviare i relativi metadati in formato JSON al broker Kafka del Sistema Data Analytics

Per l'invio dei metadati il programma nucleo dovrà instanziare un Kafka producer.

Il messaggio JSON per il broker dovrà avere i seguenti attributi sempre disponibili:

Nome attributo Valore
name Il nome del contenuto. Verrà mostrata sulla UI
key stringa senza spazi. (Riferimento interno al Workflow Media)
uuid UUIDv4. (Generato dall'utente)
messageType picture | file | log | html
title Un titolo
description Una descrizione
var lasciare vuoto
bdaId BDA_ID (variabile d'ambiente passata dal Sistema Data Analytics al Service)
serviceId SERVICE_ID (variabile d'ambiente passata dal Sistema Data Analytics al Service)
organizationId ORGANIZATION_ID (variabile d'ambiente passata dal Sistema Data Analytics al Service)
ownerId OWNER_ID (variabile d'ambiente passata dal Sistema Data Analytics al Service)
executionId EXECUTION_ID (variabile d'ambiente passata dal Sistema Data Analytics al Service)
accessLevel ACCESS_LEVEL (variabile d'ambiente passata dal Sistema Data Analytics al Service)
executorId EXECUTOR_ID (variabile d'ambiente passata dal Sistema Data Analytics al Service)
executorName EXECUTOR_NAME (variabile d'ambiente passata dal Sistema Data Analytics al Service)
executorOrgId EXECUTOR_ORG_ID (variabile d'ambiente passata dal Sistema Data Analytics al Service)
executorOrgName EXECUTOR_ORG_NAME (variabile d'ambiente passata dal Sistema Data Analytics al Service)
created datetime timestamp
modified datetime timestamp
show true | false

Inoltre, se messageType = log allora:

Nome attributo Valore
value Se messageType = log allora:
Valore numerico o stringa da mostrare
altrimenti:
vuoto
valueType string

altrimenti se messageType != log:

Nome attributo Valore
path Lasciare vuoto se messageType = log, altrimenti:
Percorso del file (incluso nome del file) all'interno del bucket del MinIO interno del Sistema Data Analytics
extension Estensione del file (incluso il punto) (es. .csv, .html)
filename Nome del file + estensione

Esempio Python di funzioni di utilità

Example of utility functions
# ... omitted ...

# Prepare base metadata message
def prepare_metadata_json(name, messageType):
    result = {
        "name": name,
        "key": name.lower().replace(" ", "-"),
        "uuid": str(uuid.uuid4()),
        "messageType": messageType,
        "created": str(datetime.now()),
        "modified": str(datetime.now())
    }

    # Environment variables automatically passed by Data Analytics System to the Service
    # go in the metadata message
    if "BDA_ID" in os.environ:
        result['bdaId']           = os.environ.get('BDA_ID')

    if "SERVICE_ID" in os.environ:
        result['serviceId']       = os.environ.get('SERVICE_ID')

    if "ORGANIZATION_ID" in os.environ:
        result['organizationId']  = os.environ.get('ORGANIZATION_ID')

    if "OWNER_ID" in os.environ:
        result['ownerId']         = os.environ.get('OWNER_ID')

    if "EXECUTION_ID" in os.environ:
        result['executionId']     = os.environ.get('EXECUTION_ID')

    if "ACCESS_LEVEL" in os.environ:
        result['accessLevel']     = os.environ.get('ACCESS_LEVEL')

    if "EXECUTOR_ID" in os.environ:
        result['executorId']      = os.environ.get('EXECUTOR_ID')

    if "EXECUTOR_NAME" in os.environ:
        result['executorName']    = os.environ.get('EXECUTOR_NAME')

    if "EXECUTOR_ORG_ID" in os.environ:
        result['executorOrgId']   = os.environ.get('EXECUTOR_ORG_ID')

    if "EXECUTOR_ORG_NAME" in os.environ:
        result['executorOrgName'] = os.environ.get('EXECUTOR_ORG_NAME')

    return result

# For files
def prepare_file_metadata(name, messageType, path, extension, filename, **kwargs):
    # Preparing generic metadata message
    metadata              = prepare_metadata_json(name=name, messageType=messageType, **kwargs)
    metadata['path']      = path
    metadata['extension'] = extension

    if messageType == "picture":
        metadata['filename'] = filename

    return metadata

def upload_file_to_minio(minio_url, access_key, secret_key, bucket_name, object_name, local_file_path, secure=False):
  address = minio_url.replace("http://", "").replace("https://", "")
  client = Minio(address, access_key=access_key, secret_key=secret_key, secure=secure)
  client.fput_object(bucket_name, object_name, local_file_path)

# Sends metadata message to Kafka
def send_message(data, args):
    producer = _init_producer_kafka(args)
    producer.send(args.go_manager_topic, json.dumps(data).encode('utf-8'))
    producer.flush()

def send_application_media(file_to_send, file_name, file_type, args, **kwargs):
    folder = args.go_minio_path

    if folder[-1] != "/":
        folder = folder + "/"

    # Preparing metadata message for "files"
    metadata = prepare_file_metadata(
      name=file_to_send.split('.')[0], 
      messageType=file_type, 
      path=folder + file_name,
      extension=file_to_send.split('.')[-1], 
      filename=file_name, 
      **kwargs
    )

    # Uploading file to MinIO
    upload_file_to_minio(
      args.go_minio_url, 
      args.go_access_key, 
      args.go_secret_key, 
      args.go_minio_bucket, 
      folder + file_name, 
      file_to_send, 
      secure=args.
      go_use_ssl
    )

    # Sending metadata message to Kafka
    send_message(metadata, args)

def send_log(name, value, value_type, args, **kwargs):
    # Preparing generic metadata message
    message_to_sent               = prepare_metadata_json(name, "log", **kwargs)
    message_to_sent['value']      = value
    message_to_sent['valueType']  = value_type

    # Note: in the case of "logs", there is no file to upload to MinIO

    # Sending metadata message to Kafka
    send_message(message_to_sent, args)

# ... omitted ...

Esempio Python di codice del Service per l'invio dei Workflow Media

Arguments handling
import argparse
import json

def str2json(string):
    return json.loads(string)

def str2bool(v):
    if isinstance(v, bool):
        return v
    if v.lower() in ('yes', 'true', 't', 'y', '1'):
        return True
    elif v.lower() in ('no', 'false', 'f', 'n', '0', ''):
        return False

parser = argparse.ArgumentParser()
json_example_ = {
    "test_size": 0.1,
    "train_size": 0.9,
    "random_state": 0,
    "shuffle": True,
    "stratify": None
}

parser.add_argument(
  "--json_example", 
  help="json test.",
  type=str2json, 
  default=json.dumps(json_example_)
)

parser.add_argument('--go_manager.brokers', dest='go_manager_brokers', type=str, required=True)
parser.add_argument('--go_manager.topic', dest='go_manager_topic', type=str, required=True)
parser.add_argument('--go_manager.base_path', dest='go_minio_path', type=str, required=True)
parser.add_argument('--go_manager.minio_bucket', dest='go_minio_bucket', type=str, required=True)
parser.add_argument('--go_manager.minIO_URL', dest='go_minio_url', type=str, required=True)
parser.add_argument('--go_manager.minIO_ACCESS_KEY', dest='go_access_key', type=str, required=True)
parser.add_argument('--go_manager.minIO_SECRET_KEY', dest='go_secret_key', type=str, required=True)
parser.add_argument('--go_manager.use_ssl', dest='go_use_ssl', type=str2bool, required=True)

args, unknown = parser.parse_known_args()
from arguments import args
import json
import os
from utils import send_log, send_application_media

def process():
    json_test = args.json_example

    send_log("json_test_log", json_test, 'json', args)
    send_log("test_log_int", 10, 'int', args)
    send_log("test_log_float", 0.2, 'float', args)
    send_log("test_log_str", "Hi there", 'string', args)

    with open('data.json', 'w') as json_file:
        json.dump(json_test, json_file)

    send_application_media("data.json", "data.json", "file", args)
    send_application_media("test.html", "test.html", "html", args)
    send_application_media("image.png", "image.png", "picture", args)
    send_application_media("gx.zip", "gx.zip", "file", args)

if __name__ == '__main__':
    process()

Una volta che il Service avrà generato ed inviato al Sistema Data Analytics i contenuti multimediali, questi saranno disponibili da interfaccia utente. La modalità di visualizzazione del contenuto dipende dal suo tipo:

  • File generici: disponibili per lo scaricamento
  • Pagine HTML: renderizzate all'interno di un tab separato
  • Log: mostrate su UI
  • Immagini (es. grafici): renderizzate su UI

workflow-media-viewer