Vai al contenuto

Raccolta esempi di Service

Di seguito troverai una serie di esempi di Service. Per semplicità forniremo il metamodello in formato JSON. Per maggiori informazioni sulla definizione manuale di questo, visitare:

Es. 1: Input e Output di Tipo Dataset su MinIO

In lavorazione ...

Diagramma

Codice Programma Nucleo

import argparse

def str2bool(v):
    if isinstance(v, bool):
        return v
    if v.lower() in ('yes', 'true', 't', 'y', '1'):
        return True
    elif v.lower() in ('no', 'false', 'f', 'n', '0', ''):
        return False

parser = argparse.ArgumentParser()

parser.add_argument('--input-dataset', dest='input_dataset', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket.minIO_SECRET_KEY', dest='input_secret_key', type=str, required=True)
parser.add_argument('--input-dataset.use_ssl', dest='input_use_ssl', type=str2bool, required=True)

parser.add_argument('--input-columns', dest='input_columns', type=str, required=False)

parser.add_argument('--output-dataset', dest='output_dataset', type=str, required=True)
parser.add_argument('--output-dataset.minio_bucket', dest='output_minio_bucket', type=str, required=True)
parser.add_argument('--output-dataset.minIO_URL', dest='output_minio_url', type=str, required=True)
parser.add_argument('--output-dataset.minIO_ACCESS_KEY', dest='output_access_key', type=str, required=True)
parser.add_argument('--output-dataset.minIO_SECRET_KEY', dest='output_secret_key', type=str, required=True)
parser.add_argument('--output-dataset.use_ssl', dest='output_use_ssl', type=str2bool, required=True)

args, unknown = parser.parse_known_args()

def minio_ls(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
    # Normalize path
    if folder.startswith("/"):
        folder = folder[1:]
    if folder[-1] != "/":
        folder = folder + "/"

    cleaned = address.replace("http://", "").replace("https://", "")
    client = Minio(
    cleaned,
    access_key=access_key,
    secret_key=secret_key,
    secure=use_ssl
    )
    objects = client.list_objects(bucket_name=bucket_name, prefix=folder)

    files_list = [x._object_name for x in objects if extention in x._object_name[-len(extention):]]
    if len(files_list) > 1:
        return ["s3://" + bucket_name + "/" + x for x in files_list]
    elif len(files_list) == 1:
        return "s3://" + bucket_name + "/" + files_list[0]
    else:
        raise Exception("Dataset is empty!")

def select_columns(df, colonne: str =None):
    if colonne is None:
        return df
    else:
        selected_columns = colonne.split(',')
        for colonne_b in selected_columns:
            colonne_a = colonne_b.strip()
            selected_columns[selected_columns.index(colonne_b)] = colonne_a
        return df[selected_columns]  


def load_from_minio(args):
    storage_options = {
        'key': args.input_access_key,
        'secret': args.input_secret_key,
        'client_kwargs': {
            'endpoint_url': args.input_minio_url
        }
    }
    file_path = minio_ls(args.input_minio_url, args.input_access_key, args.input_secret_key, args.input_minio_bucket,
                         args.input_dataset, ".csv", args.input_use_ssl)

    dataset = pd.read_csv(file_path, storage_options=storage_options, sep=None, engine='python')

    data_out = select_columns(dataset, colonne=args.input_columns)
    return data_out

df = load_from_minio(args)

def save_dataset_to_minio(df,args):
    storage_options = {
        'key': args.output_access_key,
        'secret': args.output_secret_key,
        'client_kwargs': {
            'endpoint_url': args.output_minio_url
        }
    }
    file_path = f"s3://{args.output_minio_bucket}/{args.output_dataset}/output-dataset.csv"
    print(f"[TO_CSV] path={file_path} endpoint={args.output_minio_url} bucket={args.output_minio_bucket}")

    df.to_csv(file_path,
        storage_options=storage_options,
        index=False
    )

save_dataset_to_minio(df,args)

Metamodel

{
    "name": "example",
    "description": "example of input and output data",
    "mode": "BATCH",
    "area": "PREPARATION",
    "url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/example:1.0.0",
    "version": "1.0.0",
    "accessLevel": "PUBLIC",
    "framework": {
        "id": 6,
        "name": "Python",
        "version": "3",
        "imageUrl": "https://cdn.alidalab.it/static/images/frameworks/python_logo.png"
    },
    "properties": [
        {
            "description": "Your input dataset",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "input-dataset",
            "valueType": "STRING",
            "invisible": true
        },
        {
            "defaultValue": "ANY",
            "description": "Selected columns from table",
            "key": "input-columns",
            "type": "application",
            "mandatory": true,
            "valueType": "STRING",
            "invisible": true
        },
        {
            "description": "Your output dataset",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "output-dataset",
            "valueType": "STRING",
            "invisible": true
        }
    ]
}

Es. 2: Input e Output di Tipo Stream su Kafka

Diagramma

Codice Programma Nucleo

import argparse
from kafka import KafkaConsumer, KafkaProducer
from arguments import args

# CLI Arguments definition
parser = argparse.ArgumentParser()

parser.add_argument('--input-dataset', dest='input_topic', type=str, required=True)
parser.add_argument('--input-dataset.kafka_brokers', dest='input_kafka_brokers', type=str, required=True)

parser.add_argument('--output-dataset', dest='output_topic', type=str, required=True)
parser.add_argument('--output-dataset.kafka_brokers', dest='output_kafka_brokers', type=str, required=True)

args, unknown = parser.parse_known_args()

# Business logic
consumer = KafkaConsumer(
  args.input_topic,
  bootstrap_servers=args.input_kafka_brokers.split(",")
)

producer = KafkaProducer(
  bootstrap_servers=args.output_kafka_brokers.split(",")
)

def _transform_data(input_data):
    # Here the service logic has to be implemented
    return input_data 

for message in consumer:
  # Once we get a message, publish it transformed.
  data_to_send = _transform_data(message.value)
  producer.send(args.output_topic, data_to_send)

Metamodel

{
    "name": "consume-and-publish",
    "description": "Consume and publish example",
    "mode": "BATCH",
    "area": "PREPARATION",
    "url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/consume-and-publish:1.0.0",
    "version": "1.0.0",
    "accessLevel": "PUBLIC",
    "framework": {
        "id": 6,
        "name": "Python",
        "version": "3",
        "imageUrl": "https://cdn.alidalab.it/static/images/frameworks/python_logo.png"
    },
    "properties": [
        {
            "defaultValue": null,
            "description": "The input channel where to read the text.",
            "extra": {
                "datasetType": null,
                "mode": "streaming"
            },
            "invisible": true,
            "key": "input-dataset",
            "mandatory": true,
            "type": "application",
            "valueType": "STRING"
        },
        {
            "defaultValue": null,
            "description": "The output channel to publish to.",
            "extra": {
                "datasetType": null,
                "mode": "streaming"
            },
            "invisible": true,
            "key": "output-dataset",
            "mandatory": true,
            "type": "application",
            "valueType": "STRING"
        }
    ]
}

Es. 3: Input Dataset e Output Model su MinIO

Diagramma

Codice Programma Nucleo

import argparse
from sklearn.model_selection import train_test_split
from sklearn.cluster import KMeans
import joblib
import os
from minio import Minio

def str2bool(v):
    if isinstance(v, bool):
        return v
    if v.lower() in ('yes', 'true', 't', 'y', '1'):
        return True
    elif v.lower() in ('no', 'false', 'f', 'n', '0', ''):
        return False

parser = argparse.ArgumentParser()

parser.add_argument('--input-dataset', dest='input_dataset', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket.minIO_SECRET_KEY', dest='input_secret_key', type=str, required=True)
parser.add_argument('--input-dataset.use_ssl', dest='input_use_ssl', type=str2bool, required=True)

parser.add_argument('--input-columns', dest='input_columns', type=str, required=False)

parser.add_argument('--output-model', dest='output_model', type=str, required=True)
parser.add_argument('--output-model.minio_bucket', dest='output_minio_bucket', type=str, required=True)
parser.add_argument('--output-model.minIO_URL', dest='output_minio_url', type=str, required=True)
parser.add_argument('--output-model.minIO_ACCESS_KEY', dest='output_access_key', type=str, required=True)
parser.add_argument('--output-model.minio_bucket.minIO_SECRET_KEY', dest='output_secret_key', type=str, required=True)
parser.add_argument('--output-model.use_ssl', dest='output_use_ssl', type=str2bool, required=True)

parser.add_argument("--test_split", help="Test fraction from 0 to 1.", type=float, default=0.2)

args, unknown = parser.parse_known_args()

def minio_ls(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
    if folder.startswith("/"):
        folder = folder[1:]
    if folder[-1] != "/":
      folder = folder + "/"

    cleaned=address.replace("http://", "").replace("https://", "")
    client = Minio(
    cleaned,
    access_key=access_key,
    secret_key=secret_key,
    secure=use_ssl
    )
    objects = client.list_objects(bucket_name=bucket_name, prefix=folder)

    files_list = [x._object_name for x in objects if extention in x._object_name[-len(extention):]]
    if len(files_list) > 1:
        return ["s3://" + bucket_name + "/" + x for x in files_list]
    elif len(files_list) == 1:
        return "s3://" + bucket_name + "/" + files_list[0]
    else:
        raise Exception("Dataset is empty!")


def upload_file_to_minio(minio_url, access_key, secret_key, bucket_name, object_name, local_file_path, use_ssl=False):
  # Initialize the MinIO client
  address = minio_url.replace("http://", "").replace("https://", "")
  client = Minio(address, access_key=access_key, secret_key=secret_key, secure=use_ssl)

  # Upload the file
  client.fput_object(bucket_name, object_name, local_file_path)

# Load the input dataset
storage_options = {
  'key': args.input_access_key,
  'secret': args.input_secret_key,
  'client_kwargs': {
    'endpoint_url': f'{args.input_minio_url}'
  }
}

file_path = minio_ls(args.input_minio_url, args.input_access_key, args.input_secret_key, args.input_minio_bucket, args.input_dataset, ".csv")

dataset = pd.read_csv(file_path, storage_options=storage_options, sep = None, engine = 'python')

if args.input_columns is not None:
    selected_columns = [c.strip() for c in args.input_columns.split(",")]
    df=dataset[selected_columns]
else:
    df = dataset.copy()

# Split the dataset into training and testing sets
# Only the training set is used here
df = train_test_split(df, test_size=args.test_split, random_state=42)[0]

# Extract the feature matrix from the dataframe
X = df.values

# Initialize and train a KMeans clustering model
kmeans = KMeans(n_clusters=3, random_state=42)
kmeans.fit(X)

# Save the trained model locally as a .pkl file
model_folder = "mymodel"
os.makedirs(model_folder, exist_ok=True)
model_local_path = os.path.join(model_folder,"kmeans_model.pkl")
joblib.dump(kmeans, model_local_path)

# Upload to minIo
upload_file_to_minio(args.output_minio_url, args.output_access_key, args.output_secret_key, args.output_minio_bucket, args.output_model, model_local_path, args.output_use_ssl)

Metamodel

{
    "name": "kmeans-clustering",
    "description": "Kmeans clustering model",
    "mode": "BATCH",
    "area": "ANALYSIS",
    "url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/kmeans-clustering:1.0.0",
    "version": "1.0.0",
    "accessLevel": "PUBLIC",
    "framework": {
        "id": 6,
        "name": "Python",
        "version": "3",
        "imageUrl": "https://cdn.alidalab.it/static/images/frameworks/python_logo.png"
    },
    "properties": [
        {
            "description": "Your input dataset description.",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "input-dataset",
            "valueType": "STRING"
            "invisible": true
        },
        {
            "defaultValue": "ANY",
            "description": "Selected columns from table",
            "key": "input-columns",
            "type": "application",
            "mandatory": true,
            "valueType": "STRING"
            "invisible": true
        },
        {
            "description": "Your model description",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "output-model",
            "valueType": "STRING"
            "invisible": true
        },
        {
            "description": "Test fraction from 0 to 1",
            "mandatory": true,
            "type": "application",
            "defaultValue": 0.2,
            "key": "test_split",
            "valueType": "DOUBLE"
        }
    ]
}

Es. 4: Input Dataset + Input Model + Output Model

Diagramma

service-example-no-4

Codice Programma Nucleo

import argparse
import pandas as pd
from arguments import args
from joblib import load
from utils import upload_file_to_minio, load_first_from_minio, minio_ls
from model import predict

parser = argparse.ArgumentParser()

parser.add_argument('--input-dataset', dest='input_dataset', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_dataset_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_dataset_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket.minIO_SECRET_KEY', dest='input_dataset_secret_key', type=str, required=True)

parser.add_argument('--input-model', dest='input_model', type=str, required=True)
parser.add_argument('--input-model.minio_bucket', dest='input_model_minio_bucket', type=str, required=True)
parser.add_argument('--input-model.minIO_URL', dest='input_model_minio_url', type=str, required=True)
parser.add_argument('--input-model.minIO_ACCESS_KEY', dest='input_model_access_key', type=str, required=True)
parser.add_argument('--input-model.minio_bucket.minIO_SECRET_KEY', dest='input_model_secret_key', type=str, required=True)

parser.add_argument('--output-dataset', dest='output_dataset', type=str, required=True)
parser.add_argument('--output-dataset.minio_bucket', dest='output_minio_bucket', type=str, required=True)
parser.add_argument('--output-dataset.minIO_URL', dest='output_minio_url', type=str, required=True)
parser.add_argument('--output-dataset.minIO_ACCESS_KEY', dest='output_access_key', type=str, required=True)
parser.add_argument('--output-dataset.minio_bucket.minIO_SECRET_KEY', dest='output_secret_key', type=str, required=True)

parser.add_argument("--predColName", dest='predColName'. help="Name of the column containing the predictions", type=str, default="prediction")

args, unknown = parser.parse_known_args()

def minio_ls(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
  if folder[-1] != "/":
    folder = folder + "/"

  client = Minio(
    address,
    access_key=access_key,
    secret_key=secret_key,
    secure=use_ssl
  )
  objects = client.list_objects(bucket_name=bucket_name, prefix=folder)

  files_list = [x._object_name for x in objects if extention in x._object_name[-len(extention):]]
  if len(files_list) > 1:
    return ["s3://" + bucket_name + "/" + x for x in files_list]
  elif len(files_list) == 1:
    return "s3://" + bucket_name + "/" + files_list[0]
  else:
    raise Exception("Dataset is empty!")

def load_first_from_minio(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
  if folder[-1] != "/":
    folder = folder + "/"

  client = Minio(
    address,
    access_key=access_key,
    secret_key=secret_key,
    secure=use_ssl
  )

  objects = client.list_objects(bucket_name=bucket_name, prefix=folder)

  # Find the first model file
  model_file = next((obj for obj in objects if obj._object_name.endswith(extention)), None)

  # Get the filename from the object name
  filename = os.path.basename(model_file.object_name)
  local_file_path = filename

  # Download the file
  client.fget_object(bucket_name, model_file.object_name, local_file_path)
  return local_file_path

def upload_file_to_minio(minio_url, access_key, secret_key, bucket_name, object_name, local_file_path):
  # Initialize the MinIO client
  client = Minio(minio_url, access_key=access_key, secret_key=secret_key, secure=False)

  # Upload the file
  client.fput_object(bucket_name, object_name, local_file_path)

def predict():
  model_path = load_first_from_minio(args.input_model_minio_url, args.input_model_access_key, args.input_model_secret_key, args.input_model_minio_bucket, args.input_model, ".joblib")
  knn = load(model_path)

  storage_options = {
      'key': args.input_dataset_access_key,
      'secret': args.input_dataset_secret_key,
      'client_kwargs': {
          'endpoint_url': f'{args.input_minio_url}'
      }
  }
  file_path = minio_ls(args.input_dataset_minio_url, args.input_dataset_access_key, args.input_dataset_secret_key, args.input_dataset_minio_bucket, args.input_dataset, ".csv")

  dataset = pd.read_csv(file_path, storage_options=storage_options, sep = None, engine = 'python')
  result = knn.predict(dataset)

  result = pd.DataFrame(result, columns = [args.predColName])
  dataset[args.predColName] = result[args.predColName]

  return dataset



df = predict()

storage_options = {
    'key': args.output_access_key,
    'secret': args.output_secret_key,
    'client_kwargs': {
        'endpoint_url': f'{args.output_minio_url}'
    }
}

file_path = f"s3://{args.output_minio_bucket}/{args.output_dataset}/dataset.csv"

df.to_csv(file_path,
    storage_options=storage_options,
    index=False
)

Metamodel

{
    "name": "knn-predict",
    "description": "Knn prediction model",
    "mode": "BATCH",
    "area": "ANALYSIS",
    "url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/knn-predict:1.0.0",
    "version": "1.0.0",
    "accessLevel": "PUBLIC",
    "framework": {
        "id": 6,
        "name": "Python",
        "version": "3",
        "imageUrl": "https://cdn.alidalab.it/static/images/frameworks/python_logo.png"
    },
    "properties": [
        {
            "description": "Your input dataset description.",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "input-dataset",
            "valueType": "STRING",
            "invisible": true
        },
        {
            "defaultValue": "ANY",
            "description": "Selected columns from table",
            "key": "input-columns",
            "type": "application",
            "mandatory": true,
            "valueType": "STRING",
            "invisible": true
        },
        {
            "description": "Your input model description",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "input-model",
            "valueType": "STRING",
            "invisible": true
        },
        {
            "description": "Your output dataset",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "output-dataset",
            "valueType": "STRING",
            "invisible": true
        },
        {
            "description": "Name of the column containing the predictions",
            "mandatory": true,
            "type": "application",
            "defaultValue": "prediction",
            "key": "predColName",
            "valueType": "STRING"
        }
    ]
}

Es. 5: Prediction Input Stream + Input Model + Output Stream

Diagramma

service-example-no-5

Codice Programma Nucleo

import argparse
import os
from pathlib import Path

import dill
import joblib
import pickle

import numpy as np
import pandas as pd
from alibi.explainers import AnchorTabular

parser = argparse.ArgumentParser()

parser.add_argument('--input-dataset', dest='input_topic', type=str, required=True)
parser.add_argument('--input-dataset.kafka_brokers', dest='input_kafka_brokers', type=str, required=True)

parser.add_argument('--output-dataset', dest='output_topic', type=str, required=True)
parser.add_argument('--output-dataset.kafka_brokers', dest='output_kafka_brokers', type=str, required=True)

parser.add_argument('--input-model', dest='input_model', type=str, required=True)
parser.add_argument('--input-model.minio_bucket', dest='input_model_minio_bucket', type=str, required=True)
parser.add_argument('--input-model.minIO_URL', dest='input_model_minio_url', type=str, required=True)
parser.add_argument('--input-model.minIO_ACCESS_KEY', dest='input_model_access_key', type=str, required=True)
parser.add_argument('--input-model.minio_bucket.minIO_SECRET_KEY', dest='input_model_secret_key', type=str, required=True)

parser.add_argument("--feature_to_predict", dest='feature_to_predict', help="These arguments will be injected directly into the explain method of the Anchor Tabular explainer.",  required=False, type=str)
parser.add_argument("--direct_args_to_explainer_function", dest='direct_args_to_explainer_function', help="This args are going to be injected to the KNeighborsClassifier.fit function directly", type=str, default='{"n_neighbors":3}')

args, unknown = parser.parse_known_args()

def load_first_from_minio(address, access_key, secret_key, bucket_name, folder, use_ssl=False):
  if folder[-1] != "/":
      folder = folder + "/"

  client = Minio(
      address,
      access_key=access_key,
      secret_key=secret_key,
      secure=use_ssl
  )
  objects = client.list_objects(bucket_name=bucket_name, prefix=folder)

  # Find the first model file
  model_file = next((obj for obj in objects if (obj.object_name.endswith(".joblib") or obj.object_name.endswith(".sav") or obj.object_name.endswith(".pkl"))), None)

  # Get the filename from the object name
  filename = os.path.basename(model_file.object_name)
  local_file_path = filename

  # Download the file
  client.fget_object(bucket_name, model_file.object_name, local_file_path)
  return local_file_path

def find_folder(parent_path, folder_name):
  """
  Recursively searches for a folder with the specified name inside the given parent directory.
  Only directories (not files) are considered.

  :param parent_path: Path to the root directory where the search should begin.
  :param folder_name: Name of the folder to find.
  :return: Full path to the found folder.
  :raises FileNotFoundError: If the folder is not found under the given path.
  """
  if not os.path.exists(parent_path):
    raise FileNotFoundError(f"The directory '{parent_path}' does not exist.")

  if not os.path.isdir(parent_path):
    raise NotADirectoryError(f"The path '{parent_path}' is not a directory.")

  for root, dirs, _ in os.walk(parent_path):
    if folder_name in dirs:
      return os.path.join(root, folder_name)

def download_folder_from_minio(endpoint, access_key, secret_key, bucket, path, destination, secure=False):
    cleaned = endpoint.replace("http://", "").replace("https://", "")
    client = Minio(
        cleaned,
        access_key=access_key,
        secret_key=secret_key,
        secure=secure
    )
    # Normalizza
    if path.startswith("/"):
        path = path[1:]
    if path and not path.endswith("/"):
        path += "/"

    objects = client.list_objects(bucket, prefix=path, recursive=True)
    found = False
    for obj in objects:
        found = True
        rel_path = obj.object_name[len(path):].lstrip("/")
        local_path = os.path.join(destination, rel_path)
        os.makedirs(os.path.dirname(local_path), exist_ok=True)
        client.fget_object(bucket, obj.object_name, local_path)
        print(f"Downloaded {obj.object_name} -> {local_path}", flush=True)

    if not found:
        print(f"Nessun file trovato in {bucket}/{path}", flush=True)

def build_predict_fn(model):
  if not hasattr(model, 'predict'):
    raise AttributeError("The provided model does not have a 'predict' method.")

  if hasattr(model, 'feature_names_in_'):
    return lambda x: model.predict(pd.DataFrame(x, columns=model.feature_names_in_.tolist()))
  else:
    print('motore addestrato senza features', flush=True)
    return lambda x: model.predict(x)

def load_anchor_explainer(explainer_folder_path: str) -> AnchorTabular:
  """
  Carica un AnchorTabular explainer salvato con dill da una cartella specifica,
  includendo anche il ripristino del sampler.

  :param explainer_folder_path: path alla cartella /output_model/explainer_model
  :return: istanza di AnchorTabular già fitata e completa
  """
  explainer_file = Path(explainer_folder_path) / 'explainer.dill'
  sampler_file = Path(explainer_folder_path) / 'samplers.dill'
  discretizer_file = Path(explainer_folder_path) / 'discretizer.dill'

  if not explainer_file.exists():
    raise FileNotFoundError(f"Explainer file not found at {explainer_file}")

  with open(explainer_file, 'rb') as f:
    explainer: AnchorTabular = dill.load(f)

  # Ripristino dei samplers separatamente
  if discretizer_file.exists():
    if sampler_file.exists():
      with open(discretizer_file, 'rb') as f:
        discretizer = dill.load(f)
      with open(sampler_file, 'rb') as f:
        samplers = dill.load(f)
      samplers.disc = discretizer
      explainer.samplers = [samplers]
      print("✅ Samplers ricaricati correttamente dall'explainer.", flush=True)
    else:
      print("âš  Attenzione: samplers.dill non trovato. L'explainer potrebbe non funzionare correttamente.", flush=True)

  return explainer

def extract_imputer_values_from_explainer(explainer) -> dict:
    """
    Estrae un dizionario unico {feature_name: fill_value} coerente con il training.
    - Valore centrale di qts per feature numeriche (dal Discretizer)
    - Primo valore disponibile per categoriche (da feature_values)

    Returns:
        Dict[str, Any]: mapping {column_name: fill_value}
    """
    sampler = explainer.samplers[0]
    discretizer = sampler.disc
    feature_names = explainer.feature_names

    fill_values = {}

    # Valore centrale (approssimazione della mediana) per numeriche
    for i in explainer.numerical_features:
      if i in discretizer.lambdas and 'qts' in discretizer.lambdas[i].keywords:
        qts = discretizer.lambdas[i].keywords['qts']
        if len(qts) > 0:
          fill_values[feature_names[i]] = qts[len(qts) // 2]

    # Primo valore noto per categoriche
    for i in sampler.categorical_features:
      if i in sampler.feature_values and len(sampler.feature_values[i]) > 0:
        fill_values[feature_names[i]] = sampler.feature_values[i][0]

    return fill_values

def get_anchors_results_with_prediction(sample_to_explain, fitted_anchors: AnchorTabular, explain_kwargs, target_class: str):
  try:
    encoded_instance = encode_categorical_features(sample_to_explain, fitted_anchors)
    explanation = fitted_anchors.explain(encoded_instance,**explain_kwargs)
    anch_explanation = explanation['data']
  except Exception as e:
    print('While getting anchors results, an error occured:', flush=True)
    print(e, flush=True)
    anch_explanation = None

  try:
    prediction = fitted_anchors.predictor(sample_to_explain.reshape(1,-1))[0]
  except Exception as e:
    print('While getting prediction, an error occured:', flush=True)
    print(e, flush=True)
    prediction = None

  anchor_text = ' AND '.join(anch_explanation.get('anchor', [])) if anch_explanation else ''

  return {
    'name': target_class,
    'value': str(prediction),
    'explanation': anchor_text
  }

import pandas as pd
from arguments import args
import pickle
import joblib
from utils import load_first_from_minio, minio_ls
from utils import *
from kafka import KafkaConsumer, KafkaProducer

model_path_str = load_first_from_minio(args.input_model_minio_url, args.input_model_access_key, args.input_model_secret_key, args.input_model_minio_bucket, args.input_model, ".joblib")
model_to_explain = None

try:
  model_to_explain = joblib.load(model_path_str)
except Exception:
  try:
    with open(model_to_explain, "rb") as f:
      model_to_explain = pickle.load(f)
  except Exception as e:
    raise RuntimeError(f"Failed to load model {model_path_str}: {e}")


try:
  features_name_for_model = model_to_explain.feature_names_in_.tolist()
except AttributeError:
  features_name_for_model = None

predict_fn = build_predict_fn(model_to_explain)

explainer_remote_path = '.resources/input_model'
os.makedirs(explainer_remote_path, exist_ok=True)
download_folder_from_minio(args.input_model_minio_url, args.input_model_access_key, args.input_model_secret_key, args.input_model_minio_bucket, args.input_model, explainer_remote_path, secure=False)
explainer_path = find_folder(explainer_remote_path, 'explainer_model')

explainer = load_anchor_explainer(explainer_path)
# Verificare se è ancora necessario questo passaggio
explainer.reset_predictor(predict_fn)

# Carico le informazioni per imputare i valori mancanti coerentemente all'explainer
impute_dict = extract_imputer_values_from_explainer(explainer)

direct_args_to_explainer_function = json.loads(args.direct_args_to_explainer_function)

if direct_args_to_explainer_function is None: direct_args_to_explainer_function = {}
if 'threshold' not in direct_args_to_explainer_function: direct_args_to_explainer_function['threshold'] = 0.7

if args.feature_to_predict is None: args.feature_to_predict = 'predicted_label'

consumer = KafkaConsumer(
  args.input_topic,
  bootstrap_servers=args.input_kafka_brokers.split(",")
)

producer = KafkaProducer(
  bootstrap_servers=args.output_kafka_brokers.split(",")
)

for message in consumer.read_message():
  if isinstance(message, dict):
    records = [message]
  elif isinstance(message, list) and all(isinstance(el, dict) for el in message):
    records = message
  else:
    print("⚠ Messaggio non valido (né dict né lista di dict), ignorato.")
    continue

  df_input = pd.DataFrame(records)

  # Imposta il valore nullo per feature non arrivate nel messaggio ma necessarie per il modello
  if features_name_for_model is not None:
    missing_features = [col for col in features_name_for_model if col not in df_input.columns]
    for col in missing_features:
        df_input[col] = np.nan
    df_input = df_input[features_name_for_model]

  # imputazione valori mancanti
  df_input.fillna(impute_dict, inplace=True)

  # Iterazione riga per riga per prediction + explanation
  for (i), row in df_input.iterrows():
    input_array = row.to_numpy(dtype=object)

    try:
      anchor_explain_dict = get_anchors_results_with_prediction(
        sample_to_explain=input_array,
        fitted_anchors=explainer,
        explain_kwargs=args.direct_args_to_explainer_function,
        target_class=args.feature_to_predict
      )

      # Inserimento in record originale SENZA toccare altre chiavi
      record = records[i]
      record.setdefault('predictions', []).append(anchor_explain_dict)
    except Exception as e:
      print('Error of elaboration messages:', flush=True)
      print(e, flush=True)

  producer.send(args.output_topic, records)

Metamodel

{
    "accessLevel": "PUBLIC",
    "area": "ANALYSIS",
    "description": "Anchors explainer for tabular data",
    "framework": {
        "id": 6,
        "imageUrl": "https://cdn.alidalab.it/static/images/frameworks/python_logo.png",
        "name": "Python",
        "version": "3"
    },
    "mode": "BATCH",
    "name": "anchors-tabular-predict",
    "properties": [

        {
            "defaultValue": null,
            "description": "The feature to predict. Use for multiple pretiction",
            "invisible": false,
            "key": "feature_to_predict",
            "mandatory": false,
            "type": "application",
            "valueType": "STRING"
        },
        {
            "defaultValue": null,
            "description": "These arguments will be injected directly into the explain method of the Anchor Tabular explainer.",
            "invisible": false,
            "key": "direct_args_to_explainer_function",
            "mandatory": false,
            "type": "application",
            "valueType": "JSON"
        },
        {
            "defaultValue": null,
            "description": "Your input model description.",
            "inputData": false,
            "invisible": true,
            "key": "input-model",
            "mandatory": true,
            "type": "application",
            "valueType": "STRING"
        },
        {
            "defaultValue": null,
            "description": "Data with prediction and explanations",
            "extra": {
                "datasetType": null,
                "mode": "streaming"
            },
            "invisible": true,
            "key": "output-dataset",
            "mandatory": true,
            "type": "application",
            "valueType": "STRING"
        },
        {
            "defaultValue": null,
            "description": "Data to be explained.",
            "extra": {
                "datasetType": null,
                "mode": "streaming"
            },
            "invisible": true,
            "key": "input-dataset",
            "mandatory": true,
            "type": "application",
            "valueType": "STRING"
        }
    ],
    "url": "docker://dockerhub.alidalab.it/alida/restricted/services/anchors-tabular-predict:1.1.0",
    "version": "1.1.0"
}

Es. 6: Input Dataset Multipli + Output Dataset Multiple

Diagramma

Codice Programma Nucleo

import argparse
from minio import Minio
from arguments import args
from utils import upload_dataset, fetch_dataset
import pandas as pd

parser = argparse.ArgumentParser()

parser.add_argument('--input-dataset-1', dest='input_dataset_1', type=str, required=True)
parser.add_argument('--input-dataset-1.minio_bucket', dest='input_dataset_1_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset-1.minIO_URL', dest='input_dataset_1_minio_url', type=str, required=True)
parser.add_argument('--input-dataset-1.minIO_ACCESS_KEY', dest='input_dataset_1_access_key', type=str, required=True)
parser.add_argument('--input-dataset-1.minio_bucket.minIO_SECRET_KEY', dest='input_dataset_1_secret_key', type=str, required=True)

parser.add_argument('--output-dataset-1', dest='output_dataset_1_dataset', type=str, required=True)
parser.add_argument('--output-dataset-1.minio_bucket', dest='output_dataset_1_minio_bucket', type=str, required=True)
parser.add_argument('--output-dataset-1.minIO_URL', dest='output_dataset_1_minio_url', type=str, required=True)
parser.add_argument('--output-dataset-1.minIO_ACCESS_KEY', dest='output_dataset_1_access_key', type=str, required=True)
parser.add_argument('--output-dataset-1.minIO_SECRET_KEY', dest='output_dataset_1_secret_key', type=str, required=True)

parser.add_argument('--input-dataset-2', dest='input_dataset_2', type=str, required=True)
parser.add_argument('--input-dataset-2.minio_bucket', dest='input_dataset_2_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset-2.minIO_URL', dest='input_dataset_2_minio_url', type=str, required=True)
parser.add_argument('--input-dataset-2.minIO_ACCESS_KEY', dest='input_dataset_2_access_key', type=str, required=True)
parser.add_argument('--input-dataset-2.minio_bucket.minIO_SECRET_KEY', dest='input_dataset_2_secret_key', type=str, required=True)

parser.add_argument('--output-dataset-2', dest='output_dataset_2_dataset', type=str, required=True)
parser.add_argument('--output-dataset-2.minio_bucket', dest='output_dataset_2_minio_bucket', type=str, required=True)
parser.add_argument('--output-dataset-2.minIO_URL', dest='output_dataset_2_minio_url', type=str, required=True)
parser.add_argument('--output-dataset-2.minIO_ACCESS_KEY', dest='output_dataset_2_access_key', type=str, required=True)
parser.add_argument('--output-dataset-2.minIO_SECRET_KEY', dest='output_dataset_2_secret_key', type=str, required=True)

parser.add_argument('--input-dataset-3', dest='input_dataset_3', type=str, required=True)
parser.add_argument('--input-dataset-3.minio_bucket', dest='input_dataset_3_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset-3.minIO_URL', dest='input_dataset_3_minio_url', type=str, required=True)
parser.add_argument('--input-dataset-3.minIO_ACCESS_KEY', dest='input_dataset_3_access_key', type=str, required=True)
parser.add_argument('--input-dataset-3.minio_bucket.minIO_SECRET_KEY', dest='input_dataset_3_secret_key', type=str, required=True)

parser.add_argument('--output-dataset-3', dest='output_dataset_3_dataset', type=str, required=True)
parser.add_argument('--output-dataset-3.minio_bucket', dest='output_dataset_3_minio_bucket', type=str, required=True)
parser.add_argument('--output-dataset-3.minIO_URL', dest='output_dataset_3_minio_url', type=str, required=True)
parser.add_argument('--output-dataset-3.minIO_ACCESS_KEY', dest='output_dataset_3_access_key', type=str, required=True)
parser.add_argument('--output-dataset-3.minIO_SECRET_KEY', dest='output_dataset_3_secret_key', type=str, required=True)

args, unknown = parser.parse_known_args()

def minio_ls(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
  if folder[-1] != "/":
    folder = folder + "/"

  client = Minio(
    address,
    access_key=access_key,
    secret_key=secret_key,
    secure=use_ssl
  )
  objects = client.list_objects(bucket_name=bucket_name, prefix=folder)

  files_list = [x._object_name for x in objects if extention in x._object_name[-len(extention):]]
  if len(files_list) > 1:
    return ["s3://" + bucket_name + "/" + x for x in files_list]
  elif len(files_list) == 1:
    return "s3://" + bucket_name + "/" + files_list[0]
  else:
    raise Exception("Dataset is empty!")

def fetch_dataset(dataset, minio_url, access_key, secret_key, bucket_name, extention):
    storage_options = {
      'key': access_key,
      'secret': secret_key,
      'client_kwargs': {
        'endpoint_url': f'{minio_url}'
      }
    }

    file_path = minio_ls(minio_url, access_key, secret_key, bucket_name, dataset, extention)
    input_dataset = pd.read_csv(file_path, storage_options=storage_options, sep = None, engine = 'python')
    return input_dataset

def upload_dataset(df, path, minio_url, access_key, secret_key, bucket_name):
  storage_options = {
    'key': access_key,
    'secret': secret_key,
    'client_kwargs': {
      'endpoint_url': f'{minio_url}'
    }
  }

  file_path = f"s3://{bucket_name}/{path}/dataset.csv"
  df.to_csv(file_path,
    storage_options=storage_options,
    index=False
  )

input_dataset_1 = fetch_dataset(args.input_dataset_1_minio_url, args.input_dataset_1_access_key, args.input_dataset_1_secret_key, args.input_dataset_1_minio_bucket, args.input_dataset_1_dataset, ".csv")
input_dataset_2 = fetch_dataset(args.input_dataset_2_minio_url, args.input_dataset_2_access_key, args.input_dataset_2_secret_key, args.input_dataset_2_minio_bucket, args.input_dataset_2_dataset, ".csv")
input_dataset_3 = fetch_dataset(args.input_dataset_3_minio_url, args.input_dataset_3_access_key, args.input_dataset_3_secret_key, args.input_dataset_3_minio_bucket, args.input_dataset_3_dataset, ".csv")

# Add the "Status" column with "Updated" in every row
input_dataset_1['Status'] = 'Updated 1'
input_dataset_2['Status'] = 'Updated 2'
input_dataset_3['Status'] = 'Updated 3'

upload_dataset(input_dataset_1, args.output_dataset_1_dataset, args.output_dataset_1_minio_url, args.output_dataset_1_access_key, args.output_dataset_1_secret_key, args.output_dataset_1_minio_bucket)
upload_dataset(input_dataset_2, args.output_dataset_2_dataset, args.output_dataset_2_minio_url, args.output_dataset_2_access_key, args.output_dataset_2_secret_key, args.output_dataset_2_minio_bucket)
upload_dataset(input_dataset_3, args.output_dataset_3_dataset, args.output_dataset_3_minio_url, args.output_dataset_3_access_key, args.output_dataset_3_secret_key, args.output_dataset_3_minio_bucket)

Metamodel

{
    "name": "multiple-io-example",
    "description": "Multiple IO example",
    "mode": "BATCH",
    "area": "ANALYSIS",
    "url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/multiple-io-example:1.0.0",
    "version": "1.0.0",
    "accessLevel": "PUBLIC",
    "framework": {
        "id": 6,
        "name": "Python",
        "version": "3",
        "imageUrl": "https://cdn.alidalab.it/static/images/frameworks/python_logo.png"
    },
    "properties": [
        {
            "description": "Your input dataset 1",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "input-dataset-1",
            "valueType": "STRING",
            "invisible": true
        },
        {
            "defaultValue": "ANY",
            "description": "Selected columns from table",
            "key": "input-columns-1",
            "type": "application",
            "mandatory": true,
            "valueType": "STRING",
            "invisible": true
        },
        {
            "description": "Your input dataset 2",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "input-dataset-2",
            "valueType": "STRING",
            "invisible": true
        },
        {
            "defaultValue": "ANY",
            "description": "Selected columns from table",
            "key": "input-columns-2",
            "type": "application",
            "mandatory": true,
            "valueType": "STRING",
            "invisible": true
        },
        {
            "description": "Your input dataset 3",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "input-dataset-3",
            "valueType": "STRING",
            "invisible": true
        },
        {
            "defaultValue": "ANY",
            "description": "Selected columns from table",
            "key": "input-columns-3",
            "type": "application",
            "mandatory": true,
            "valueType": "STRING",
            "invisible": true
        },
        {
            "description": "Your output dataset 1",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "output-dataset-1",
            "valueType": "STRING",
            "invisible": true
        },
        {
            "description": "Your output dataset 2",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "output-dataset-2",
            "valueType": "STRING",
            "invisible": true
        },
        {
            "description": "Your output dataset 3",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "output-dataset-3",
            "valueType": "STRING",
            "invisible": true
        }
    ]
}

Es. 7: Input Dataset + Output Model + Workflow Media

Diagramma

service-example-no-7

Codice Programma Nucleo

import argparse
import os
from minio import Minio
from arguments import args
from kafka import KafkaProducer
import pandas as pd
from arguments import args
from sklearn.neighbors import KNeighborsClassifier
from joblib import dump, load
import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay
from utilities import minio_ls, prepare_file_metadata, upload_file_to_minio
from producer import send_message

parser = argparse.ArgumentParser()

parser.add_argument('--input-dataset', dest='input_dataset', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_dataset_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_dataset_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket.minIO_SECRET_KEY', dest='input_dataset_secret_key', type=str, required=True)

parser.add_argument('--output-model', dest='output_model', type=str, required=True)
parser.add_argument('--output-model.minio_bucket', dest='output_model_minio_bucket', type=str, required=True)
parser.add_argument('--output-model.minIO_URL', dest='output_model_minio_url', type=str, required=True)
parser.add_argument('--output-model.minIO_ACCESS_KEY', dest='output_model_access_key', type=str, required=True)
parser.add_argument('--output-model.minio_bucket.minIO_SECRET_KEY', dest='output_model_secret_key', type=str, required=True)

parser.add_argument('--go_manager.brokers', dest='go_manager_brokers', type=str, required=True)
parser.add_argument('--go_manager_topic', dest='go_manager_topic', type=str, required=True)
parser.add_argument('--go_manager', dest='minio_path', type=str, required=True)
parser.add_argument('--go_manager.minio_bucket', dest='minio_bucket', type=str, required=True)
parser.add_argument('--go_manager.minIO_URL', dest='minio_url', type=str, required=True)
parser.add_argument('--go_manager.minIO_ACCESS_KEY', dest='minio_access_key', type=str, required=True)
parser.add_argument('--go_manager.minIO_SECRET_KEY', dest='minio_secret_key', type=str, required=True)

parser.add_argument("--labelCol", dest='labelCol'. help="Name of the column containing the label", type=str, required=True)
parser.add_argument("--direct_args_to_sklearn_function", dest='direct_args_to_sklearn_function'. help="This args are going to be injected to the KNeighborsClassifier.fit function directly", type=str, default='{"n_neighbors":3}')

args, unknown = parser.parse_known_args()

def minio_ls(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
  if folder[-1] != "/":
    folder = folder + "/"

  client = Minio(
    address,
    access_key=access_key,
    secret_key=secret_key,
    secure=use_ssl
  )
  objects = client.list_objects(bucket_name=bucket_name, prefix=folder)

  files_list = [x._object_name for x in objects if extention in x._object_name[-len(extention):]]
  if len(files_list) > 1:
    return ["s3://" + bucket_name + "/" + x for x in files_list]
  elif len(files_list) == 1:
    return "s3://" + bucket_name + "/" + files_list[0]
  else:
    raise Exception("Dataset is empty!")

def upload_file_to_minio(minio_url, access_key, secret_key, bucket_name, object_name, local_file_path):
  # Initialize the MinIO client
  client = Minio(minio_url, access_key=access_key, secret_key=secret_key, secure=False)

  # Upload the file
  client.fput_object(bucket_name, object_name, local_file_path)

def prepare_metadata_json(name, messageType, title=None, description=None, var=None, show=True):
  result = {
    "name": name,
    "key": name.lower().replace(" ", "-"),
    "uuid": str(uuid.uuid4()),
    "messageType": messageType,
    "title": title,
    "description": description,
    "var": var,
    "created": str(datetime.now()),
    "modified": str(datetime.now()),
    "show": show
  }

  if "BDA_ID" in os.environ:
      result['bdaId'] = os.environ.get('BDA_ID')

  if "SERVICE_ID" in os.environ:
      result['serviceId'] = os.environ.get('SERVICE_ID')

  if "ORGANIZATION_ID" in os.environ:
      result['organizationId'] = os.environ.get('ORGANIZATION_ID')

  if "OWNER_ID" in os.environ:
      result['ownerId'] = os.environ.get('OWNER_ID')

  if "EXECUTION_ID" in os.environ:
      result['executionId'] = os.environ.get('EXECUTION_ID')

  if "ACCESS_LEVEL" in os.environ:
      result['accessLevel'] = os.environ.get('ACCESS_LEVEL')

  if "EXECUTOR_ID" in os.environ:
      result['executorId'] = os.environ.get('EXECUTOR_ID')

  if "EXECUTOR_NAME" in os.environ:
      result['executorName'] = os.environ.get('EXECUTOR_NAME')

  if "EXECUTOR_ORG_ID" in os.environ:
      result['executorOrgId'] = os.environ.get('EXECUTOR_ORG_ID')

  if "EXECUTOR_ORG_NAME" in os.environ:
      result['executorOrgName'] = os.environ.get('EXECUTOR_ORG_NAME')
  return result

def prepare_file_metadata(localPath, path, extension, filename):
    metadata = prepare_metadata_json(name="validation_report.json", messageType="file")
    metadata['localPath'] = localPath
    metadata['path'] = path
    metadata['extension'] = extension
    metadata['filename'] = filename
    return metadata 

producer = KafkaProducer(bootstrap_servers=args.go_manager_brokers.split(","))

def send_message(data):
  producer.send(args.go_manager_topic, json.dumps(data).encode('utf-8'))
  producer.flush()

# Load the input dataset
storage_options = {
  'key': args.input_access_key,
  'secret': args.input_secret_key,
  'client_kwargs': {
    'endpoint_url': f'{args.input_minio_url}'
  }
}

file_path = minio_ls(args.input_dataset_minio_url, args.input_dataset_access_key, args.input_dataset_secret_key, args.input_dataset_minio_bucket, args.input_dataset, ".csv")
dataset = pd.read_csv(file_path, storage_options=storage_options)

y = dataset[args.labelCol]
X = dataset.drop(args.labelCol, axis=1)

neigh = KNeighborsClassifier(json.loads(args.direct_args_to_sklearn_function))
neigh.fit(X=X, y=y)

# Saves plot in local path
disp = ConfusionMatrixDisplay.from_estimator(neigh, X, y)
disp.plot()
disp.ax_.set_title("Confusion Matrix")
plt.savefig("conf_matrix.png")

metadata = prepare_file_metadata("conf_matrix.png", "conf_matrix.png", "picture")

#Send plot to minIo
upload_file_to_minio(args.minio_url, args.minio_access_key, args.minio_secret_key, args.minio_bucket_name, args.minio_path, 'conf_matrix.png')
send_message(metadata)

# Save the trained model locally as a .joblib fil
model_path = "./temp"
os.makedirs(model_path, exist_ok=True)
dump(neigh, os.path.join(model_path, 'model.joblib'))

# Upload to minIo
upload_file_to_minio(args.output_model_minio_url, args.output_model_access_key, args.output_model_secret_key, args.output_model_minio_bucket, args.output_model, model_path)

Metamodel

{
    "name": "knn-fit",
    "description": "Knn model training",
    "mode": "BATCH",
    "area": "ANALYSIS",
    "url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/knn-fit:1.0.0",
    "version": "1.0.0",
    "accessLevel": "PUBLIC",
    "framework": {
        "id": 6,
        "name": "Python",
        "version": "3",
        "imageUrl": "https://cdn.alidalab.it/static/images/frameworks/python_logo.png"
    },
    "properties": [
        {
            "description": "Your input dataset description.",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "input-dataset",
            "valueType": "STRING",
            "invisible": true
        },
        {
            "defaultValue": "ANY",
            "description": "Selected columns from table",
            "key": "input-columns",
            "type": "application",
            "mandatory": true,
            "valueType": "STRING",
            "invisible": true
        },
        {
            "description": "Your model description",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "output-model",
            "valueType": "STRING",
            "invisible": true
        },
        {
            "description": "Name of the column containing the label",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "labelCol",
            "valueType": "STRING"
        },
        {
            "description": "This args are going to be injected to the KNeighborsClassifier.fit function directly",
            "mandatory": true,
            "type": "application",
            "defaultValue": "{\"n_neighbors\":3}",
            "key": "direct_args_to_sklearn_function",
            "valueType": "STRING"
        }
    ]
}

Es. 8: Generic Assets

Diagramma

service-example-no-8

Codice Programma Nucleo

package main

import (
    "encoding/csv"
    "fmt"
    "os"
    "path/filepath"
)

func appendHelloWorld(inputDir, outputDir string) {
    // Trova i file CSV nella directory di input
    files, err := filepath.Glob(filepath.Join(inputDir, "*.csv"))
    if err != nil {
        fmt.Println("Errore nella ricerca dei file:", err)
        return
    }

    if len(files) == 0 {
        fmt.Println("Nessun file CSV trovato nella directory di input")
        return
    }

    // Usa il primo file trovato
    inputFile := files[0]
    outputFile := filepath.Join(outputDir, "output.csv")

    // Apri il file di input
    file, err := os.Open(inputFile)
    if err != nil {
        fmt.Println("Errore nell'apertura del file:", err)
        return
    }
    defer file.Close()

    reader := csv.NewReader(file)
    rows, err := reader.ReadAll()
    if err != nil {
        fmt.Println("Errore nella lettura del file:", err)
        return
    }

    // Aggiungi una nuova riga con "hello world"
    rows = append(rows, []string{"hello world"})

    // Crea la directory di output se non esiste
    if err := os.MkdirAll(outputDir, os.ModePerm); err != nil {
        fmt.Println("Errore nella creazione della directory di output:", err)
        return
    }

    // Scrive il file CSV modificato
    outFile, err := os.Create(outputFile)
    if err != nil {
        fmt.Println("Errore nella creazione del file di output:", err)
        return
    }
    defer outFile.Close()

    writer := csv.NewWriter(outFile)
    err = writer.WriteAll(rows)
    if err != nil {
        fmt.Println("Errore nella scrittura del file CSV:", err)
        return
    }

    fmt.Println("File salvato:", outputFile)
}

func main() {
    inputDir := "/inputs"
    outputDir := "/outputs"
    appendHelloWorld(inputDir, outputDir)
}

Metamodel

{
  "name": "generic-go-ex1",
  "version": "1.0.0",
  "accessLevel": "PUBLIC",
  "area": "PREPARATION",
  "description": "",
  "mode": "BATCH",
  "properties": [
    {
      "defaultValue": "{\"inputs\": [{\"name\": \"input-data\", \"path\": \"/inputs\", \"type\": \"dataset\",\"description\": \"my description\"}], \"outputs\": [{\"name\": \"output-data\", \"path\": \"/outputs\", \"type\": \"dataset\",\"description\": \"my description\"}]}",
      "description": "",
      "externalized": false,
      "extra": null,
      "invisible": true,
      "key": "assets",
      "mandatory": true,
      "type": "application",
      "uri": null,
      "value": null,
      "valueType": "JSON"
    }
  ],
  "ref": null,
  "statusModel": null,
  "tags": [],
  "url": "docker://gitlab.alidalab.it:5000/alida/analytics2/example-go-1:latest"
}