Raccolta esempi di Service
Di seguito troverai una serie di esempi di Service. Per semplicità forniremo il metamodello in formato JSON. Per maggiori informazioni sulla definizione manuale di questo, visitare:
Es. 1: Input e Output di Tipo Dataset su MinIO
In lavorazione ...
Diagramma
Codice Programma Nucleo
import argparse
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0', ''):
return False
parser = argparse.ArgumentParser()
parser.add_argument('--input-dataset', dest='input_dataset', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket.minIO_SECRET_KEY', dest='input_secret_key', type=str, required=True)
parser.add_argument('--input-dataset.use_ssl', dest='input_use_ssl', type=str2bool, required=True)
parser.add_argument('--input-columns', dest='input_columns', type=str, required=False)
parser.add_argument('--output-dataset', dest='output_dataset', type=str, required=True)
parser.add_argument('--output-dataset.minio_bucket', dest='output_minio_bucket', type=str, required=True)
parser.add_argument('--output-dataset.minIO_URL', dest='output_minio_url', type=str, required=True)
parser.add_argument('--output-dataset.minIO_ACCESS_KEY', dest='output_access_key', type=str, required=True)
parser.add_argument('--output-dataset.minIO_SECRET_KEY', dest='output_secret_key', type=str, required=True)
parser.add_argument('--output-dataset.use_ssl', dest='output_use_ssl', type=str2bool, required=True)
args, unknown = parser.parse_known_args()
def minio_ls(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
# Normalize path
if folder.startswith("/"):
folder = folder[1:]
if folder[-1] != "/":
folder = folder + "/"
cleaned = address.replace("http://", "").replace("https://", "")
client = Minio(
cleaned,
access_key=access_key,
secret_key=secret_key,
secure=use_ssl
)
objects = client.list_objects(bucket_name=bucket_name, prefix=folder)
files_list = [x._object_name for x in objects if extention in x._object_name[-len(extention):]]
if len(files_list) > 1:
return ["s3://" + bucket_name + "/" + x for x in files_list]
elif len(files_list) == 1:
return "s3://" + bucket_name + "/" + files_list[0]
else:
raise Exception("Dataset is empty!")
def select_columns(df, colonne: str =None):
if colonne is None:
return df
else:
selected_columns = colonne.split(',')
for colonne_b in selected_columns:
colonne_a = colonne_b.strip()
selected_columns[selected_columns.index(colonne_b)] = colonne_a
return df[selected_columns]
def load_from_minio(args):
storage_options = {
'key': args.input_access_key,
'secret': args.input_secret_key,
'client_kwargs': {
'endpoint_url': args.input_minio_url
}
}
file_path = minio_ls(args.input_minio_url, args.input_access_key, args.input_secret_key, args.input_minio_bucket,
args.input_dataset, ".csv", args.input_use_ssl)
dataset = pd.read_csv(file_path, storage_options=storage_options, sep=None, engine='python')
data_out = select_columns(dataset, colonne=args.input_columns)
return data_out
df = load_from_minio(args)
def save_dataset_to_minio(df,args):
storage_options = {
'key': args.output_access_key,
'secret': args.output_secret_key,
'client_kwargs': {
'endpoint_url': args.output_minio_url
}
}
file_path = f"s3://{args.output_minio_bucket}/{args.output_dataset}/output-dataset.csv"
print(f"[TO_CSV] path={file_path} endpoint={args.output_minio_url} bucket={args.output_minio_bucket}")
df.to_csv(file_path,
storage_options=storage_options,
index=False
)
save_dataset_to_minio(df,args)
Metamodel
{
"name": "example",
"description": "example of input and output data",
"mode": "BATCH",
"area": "PREPARATION",
"url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/example:1.0.0",
"version": "1.0.0",
"accessLevel": "PUBLIC",
"framework": {
"id": 6,
"name": "Python",
"version": "3",
"imageUrl": "https://cdn.alidalab.it/static/images/frameworks/python_logo.png"
},
"properties": [
{
"description": "Your input dataset",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "input-dataset",
"valueType": "STRING",
"invisible": true
},
{
"defaultValue": "ANY",
"description": "Selected columns from table",
"key": "input-columns",
"type": "application",
"mandatory": true,
"valueType": "STRING",
"invisible": true
},
{
"description": "Your output dataset",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "output-dataset",
"valueType": "STRING",
"invisible": true
}
]
}
Es. 2: Input e Output di Tipo Stream su Kafka
Diagramma
Codice Programma Nucleo
import argparse
from kafka import KafkaConsumer, KafkaProducer
from arguments import args
# CLI Arguments definition
parser = argparse.ArgumentParser()
parser.add_argument('--input-dataset', dest='input_topic', type=str, required=True)
parser.add_argument('--input-dataset.kafka_brokers', dest='input_kafka_brokers', type=str, required=True)
parser.add_argument('--output-dataset', dest='output_topic', type=str, required=True)
parser.add_argument('--output-dataset.kafka_brokers', dest='output_kafka_brokers', type=str, required=True)
args, unknown = parser.parse_known_args()
# Business logic
consumer = KafkaConsumer(
args.input_topic,
bootstrap_servers=args.input_kafka_brokers.split(",")
)
producer = KafkaProducer(
bootstrap_servers=args.output_kafka_brokers.split(",")
)
def _transform_data(input_data):
# Here the service logic has to be implemented
return input_data
for message in consumer:
# Once we get a message, publish it transformed.
data_to_send = _transform_data(message.value)
producer.send(args.output_topic, data_to_send)
Metamodel
{
"name": "consume-and-publish",
"description": "Consume and publish example",
"mode": "BATCH",
"area": "PREPARATION",
"url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/consume-and-publish:1.0.0",
"version": "1.0.0",
"accessLevel": "PUBLIC",
"framework": {
"id": 6,
"name": "Python",
"version": "3",
"imageUrl": "https://cdn.alidalab.it/static/images/frameworks/python_logo.png"
},
"properties": [
{
"defaultValue": null,
"description": "The input channel where to read the text.",
"extra": {
"datasetType": null,
"mode": "streaming"
},
"invisible": true,
"key": "input-dataset",
"mandatory": true,
"type": "application",
"valueType": "STRING"
},
{
"defaultValue": null,
"description": "The output channel to publish to.",
"extra": {
"datasetType": null,
"mode": "streaming"
},
"invisible": true,
"key": "output-dataset",
"mandatory": true,
"type": "application",
"valueType": "STRING"
}
]
}
Es. 3: Input Dataset e Output Model su MinIO
Diagramma
Codice Programma Nucleo
import argparse
from sklearn.model_selection import train_test_split
from sklearn.cluster import KMeans
import joblib
import os
from minio import Minio
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0', ''):
return False
parser = argparse.ArgumentParser()
parser.add_argument('--input-dataset', dest='input_dataset', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket.minIO_SECRET_KEY', dest='input_secret_key', type=str, required=True)
parser.add_argument('--input-dataset.use_ssl', dest='input_use_ssl', type=str2bool, required=True)
parser.add_argument('--input-columns', dest='input_columns', type=str, required=False)
parser.add_argument('--output-model', dest='output_model', type=str, required=True)
parser.add_argument('--output-model.minio_bucket', dest='output_minio_bucket', type=str, required=True)
parser.add_argument('--output-model.minIO_URL', dest='output_minio_url', type=str, required=True)
parser.add_argument('--output-model.minIO_ACCESS_KEY', dest='output_access_key', type=str, required=True)
parser.add_argument('--output-model.minio_bucket.minIO_SECRET_KEY', dest='output_secret_key', type=str, required=True)
parser.add_argument('--output-model.use_ssl', dest='output_use_ssl', type=str2bool, required=True)
parser.add_argument("--test_split", help="Test fraction from 0 to 1.", type=float, default=0.2)
args, unknown = parser.parse_known_args()
def minio_ls(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
if folder.startswith("/"):
folder = folder[1:]
if folder[-1] != "/":
folder = folder + "/"
cleaned=address.replace("http://", "").replace("https://", "")
client = Minio(
cleaned,
access_key=access_key,
secret_key=secret_key,
secure=use_ssl
)
objects = client.list_objects(bucket_name=bucket_name, prefix=folder)
files_list = [x._object_name for x in objects if extention in x._object_name[-len(extention):]]
if len(files_list) > 1:
return ["s3://" + bucket_name + "/" + x for x in files_list]
elif len(files_list) == 1:
return "s3://" + bucket_name + "/" + files_list[0]
else:
raise Exception("Dataset is empty!")
def upload_file_to_minio(minio_url, access_key, secret_key, bucket_name, object_name, local_file_path, use_ssl=False):
# Initialize the MinIO client
address = minio_url.replace("http://", "").replace("https://", "")
client = Minio(address, access_key=access_key, secret_key=secret_key, secure=use_ssl)
# Upload the file
client.fput_object(bucket_name, object_name, local_file_path)
# Load the input dataset
storage_options = {
'key': args.input_access_key,
'secret': args.input_secret_key,
'client_kwargs': {
'endpoint_url': f'{args.input_minio_url}'
}
}
file_path = minio_ls(args.input_minio_url, args.input_access_key, args.input_secret_key, args.input_minio_bucket, args.input_dataset, ".csv")
dataset = pd.read_csv(file_path, storage_options=storage_options, sep = None, engine = 'python')
if args.input_columns is not None:
selected_columns = [c.strip() for c in args.input_columns.split(",")]
df=dataset[selected_columns]
else:
df = dataset.copy()
# Split the dataset into training and testing sets
# Only the training set is used here
df = train_test_split(df, test_size=args.test_split, random_state=42)[0]
# Extract the feature matrix from the dataframe
X = df.values
# Initialize and train a KMeans clustering model
kmeans = KMeans(n_clusters=3, random_state=42)
kmeans.fit(X)
# Save the trained model locally as a .pkl file
model_folder = "mymodel"
os.makedirs(model_folder, exist_ok=True)
model_local_path = os.path.join(model_folder,"kmeans_model.pkl")
joblib.dump(kmeans, model_local_path)
# Upload to minIo
upload_file_to_minio(args.output_minio_url, args.output_access_key, args.output_secret_key, args.output_minio_bucket, args.output_model, model_local_path, args.output_use_ssl)
Metamodel
{
"name": "kmeans-clustering",
"description": "Kmeans clustering model",
"mode": "BATCH",
"area": "ANALYSIS",
"url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/kmeans-clustering:1.0.0",
"version": "1.0.0",
"accessLevel": "PUBLIC",
"framework": {
"id": 6,
"name": "Python",
"version": "3",
"imageUrl": "https://cdn.alidalab.it/static/images/frameworks/python_logo.png"
},
"properties": [
{
"description": "Your input dataset description.",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "input-dataset",
"valueType": "STRING"
"invisible": true
},
{
"defaultValue": "ANY",
"description": "Selected columns from table",
"key": "input-columns",
"type": "application",
"mandatory": true,
"valueType": "STRING"
"invisible": true
},
{
"description": "Your model description",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "output-model",
"valueType": "STRING"
"invisible": true
},
{
"description": "Test fraction from 0 to 1",
"mandatory": true,
"type": "application",
"defaultValue": 0.2,
"key": "test_split",
"valueType": "DOUBLE"
}
]
}
Es. 4: Input Dataset + Input Model + Output Model
Diagramma

Codice Programma Nucleo
import argparse
import pandas as pd
from arguments import args
from joblib import load
from utils import upload_file_to_minio, load_first_from_minio, minio_ls
from model import predict
parser = argparse.ArgumentParser()
parser.add_argument('--input-dataset', dest='input_dataset', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_dataset_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_dataset_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket.minIO_SECRET_KEY', dest='input_dataset_secret_key', type=str, required=True)
parser.add_argument('--input-model', dest='input_model', type=str, required=True)
parser.add_argument('--input-model.minio_bucket', dest='input_model_minio_bucket', type=str, required=True)
parser.add_argument('--input-model.minIO_URL', dest='input_model_minio_url', type=str, required=True)
parser.add_argument('--input-model.minIO_ACCESS_KEY', dest='input_model_access_key', type=str, required=True)
parser.add_argument('--input-model.minio_bucket.minIO_SECRET_KEY', dest='input_model_secret_key', type=str, required=True)
parser.add_argument('--output-dataset', dest='output_dataset', type=str, required=True)
parser.add_argument('--output-dataset.minio_bucket', dest='output_minio_bucket', type=str, required=True)
parser.add_argument('--output-dataset.minIO_URL', dest='output_minio_url', type=str, required=True)
parser.add_argument('--output-dataset.minIO_ACCESS_KEY', dest='output_access_key', type=str, required=True)
parser.add_argument('--output-dataset.minio_bucket.minIO_SECRET_KEY', dest='output_secret_key', type=str, required=True)
parser.add_argument("--predColName", dest='predColName'. help="Name of the column containing the predictions", type=str, default="prediction")
args, unknown = parser.parse_known_args()
def minio_ls(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
if folder[-1] != "/":
folder = folder + "/"
client = Minio(
address,
access_key=access_key,
secret_key=secret_key,
secure=use_ssl
)
objects = client.list_objects(bucket_name=bucket_name, prefix=folder)
files_list = [x._object_name for x in objects if extention in x._object_name[-len(extention):]]
if len(files_list) > 1:
return ["s3://" + bucket_name + "/" + x for x in files_list]
elif len(files_list) == 1:
return "s3://" + bucket_name + "/" + files_list[0]
else:
raise Exception("Dataset is empty!")
def load_first_from_minio(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
if folder[-1] != "/":
folder = folder + "/"
client = Minio(
address,
access_key=access_key,
secret_key=secret_key,
secure=use_ssl
)
objects = client.list_objects(bucket_name=bucket_name, prefix=folder)
# Find the first model file
model_file = next((obj for obj in objects if obj._object_name.endswith(extention)), None)
# Get the filename from the object name
filename = os.path.basename(model_file.object_name)
local_file_path = filename
# Download the file
client.fget_object(bucket_name, model_file.object_name, local_file_path)
return local_file_path
def upload_file_to_minio(minio_url, access_key, secret_key, bucket_name, object_name, local_file_path):
# Initialize the MinIO client
client = Minio(minio_url, access_key=access_key, secret_key=secret_key, secure=False)
# Upload the file
client.fput_object(bucket_name, object_name, local_file_path)
def predict():
model_path = load_first_from_minio(args.input_model_minio_url, args.input_model_access_key, args.input_model_secret_key, args.input_model_minio_bucket, args.input_model, ".joblib")
knn = load(model_path)
storage_options = {
'key': args.input_dataset_access_key,
'secret': args.input_dataset_secret_key,
'client_kwargs': {
'endpoint_url': f'{args.input_minio_url}'
}
}
file_path = minio_ls(args.input_dataset_minio_url, args.input_dataset_access_key, args.input_dataset_secret_key, args.input_dataset_minio_bucket, args.input_dataset, ".csv")
dataset = pd.read_csv(file_path, storage_options=storage_options, sep = None, engine = 'python')
result = knn.predict(dataset)
result = pd.DataFrame(result, columns = [args.predColName])
dataset[args.predColName] = result[args.predColName]
return dataset
df = predict()
storage_options = {
'key': args.output_access_key,
'secret': args.output_secret_key,
'client_kwargs': {
'endpoint_url': f'{args.output_minio_url}'
}
}
file_path = f"s3://{args.output_minio_bucket}/{args.output_dataset}/dataset.csv"
df.to_csv(file_path,
storage_options=storage_options,
index=False
)
Metamodel
{
"name": "knn-predict",
"description": "Knn prediction model",
"mode": "BATCH",
"area": "ANALYSIS",
"url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/knn-predict:1.0.0",
"version": "1.0.0",
"accessLevel": "PUBLIC",
"framework": {
"id": 6,
"name": "Python",
"version": "3",
"imageUrl": "https://cdn.alidalab.it/static/images/frameworks/python_logo.png"
},
"properties": [
{
"description": "Your input dataset description.",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "input-dataset",
"valueType": "STRING",
"invisible": true
},
{
"defaultValue": "ANY",
"description": "Selected columns from table",
"key": "input-columns",
"type": "application",
"mandatory": true,
"valueType": "STRING",
"invisible": true
},
{
"description": "Your input model description",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "input-model",
"valueType": "STRING",
"invisible": true
},
{
"description": "Your output dataset",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "output-dataset",
"valueType": "STRING",
"invisible": true
},
{
"description": "Name of the column containing the predictions",
"mandatory": true,
"type": "application",
"defaultValue": "prediction",
"key": "predColName",
"valueType": "STRING"
}
]
}
Es. 5: Prediction Input Stream + Input Model + Output Stream
Diagramma

Codice Programma Nucleo
import argparse
import os
from pathlib import Path
import dill
import joblib
import pickle
import numpy as np
import pandas as pd
from alibi.explainers import AnchorTabular
parser = argparse.ArgumentParser()
parser.add_argument('--input-dataset', dest='input_topic', type=str, required=True)
parser.add_argument('--input-dataset.kafka_brokers', dest='input_kafka_brokers', type=str, required=True)
parser.add_argument('--output-dataset', dest='output_topic', type=str, required=True)
parser.add_argument('--output-dataset.kafka_brokers', dest='output_kafka_brokers', type=str, required=True)
parser.add_argument('--input-model', dest='input_model', type=str, required=True)
parser.add_argument('--input-model.minio_bucket', dest='input_model_minio_bucket', type=str, required=True)
parser.add_argument('--input-model.minIO_URL', dest='input_model_minio_url', type=str, required=True)
parser.add_argument('--input-model.minIO_ACCESS_KEY', dest='input_model_access_key', type=str, required=True)
parser.add_argument('--input-model.minio_bucket.minIO_SECRET_KEY', dest='input_model_secret_key', type=str, required=True)
parser.add_argument("--feature_to_predict", dest='feature_to_predict', help="These arguments will be injected directly into the explain method of the Anchor Tabular explainer.", required=False, type=str)
parser.add_argument("--direct_args_to_explainer_function", dest='direct_args_to_explainer_function', help="This args are going to be injected to the KNeighborsClassifier.fit function directly", type=str, default='{"n_neighbors":3}')
args, unknown = parser.parse_known_args()
def load_first_from_minio(address, access_key, secret_key, bucket_name, folder, use_ssl=False):
if folder[-1] != "/":
folder = folder + "/"
client = Minio(
address,
access_key=access_key,
secret_key=secret_key,
secure=use_ssl
)
objects = client.list_objects(bucket_name=bucket_name, prefix=folder)
# Find the first model file
model_file = next((obj for obj in objects if (obj.object_name.endswith(".joblib") or obj.object_name.endswith(".sav") or obj.object_name.endswith(".pkl"))), None)
# Get the filename from the object name
filename = os.path.basename(model_file.object_name)
local_file_path = filename
# Download the file
client.fget_object(bucket_name, model_file.object_name, local_file_path)
return local_file_path
def find_folder(parent_path, folder_name):
"""
Recursively searches for a folder with the specified name inside the given parent directory.
Only directories (not files) are considered.
:param parent_path: Path to the root directory where the search should begin.
:param folder_name: Name of the folder to find.
:return: Full path to the found folder.
:raises FileNotFoundError: If the folder is not found under the given path.
"""
if not os.path.exists(parent_path):
raise FileNotFoundError(f"The directory '{parent_path}' does not exist.")
if not os.path.isdir(parent_path):
raise NotADirectoryError(f"The path '{parent_path}' is not a directory.")
for root, dirs, _ in os.walk(parent_path):
if folder_name in dirs:
return os.path.join(root, folder_name)
def download_folder_from_minio(endpoint, access_key, secret_key, bucket, path, destination, secure=False):
cleaned = endpoint.replace("http://", "").replace("https://", "")
client = Minio(
cleaned,
access_key=access_key,
secret_key=secret_key,
secure=secure
)
# Normalizza
if path.startswith("/"):
path = path[1:]
if path and not path.endswith("/"):
path += "/"
objects = client.list_objects(bucket, prefix=path, recursive=True)
found = False
for obj in objects:
found = True
rel_path = obj.object_name[len(path):].lstrip("/")
local_path = os.path.join(destination, rel_path)
os.makedirs(os.path.dirname(local_path), exist_ok=True)
client.fget_object(bucket, obj.object_name, local_path)
print(f"Downloaded {obj.object_name} -> {local_path}", flush=True)
if not found:
print(f"Nessun file trovato in {bucket}/{path}", flush=True)
def build_predict_fn(model):
if not hasattr(model, 'predict'):
raise AttributeError("The provided model does not have a 'predict' method.")
if hasattr(model, 'feature_names_in_'):
return lambda x: model.predict(pd.DataFrame(x, columns=model.feature_names_in_.tolist()))
else:
print('motore addestrato senza features', flush=True)
return lambda x: model.predict(x)
def load_anchor_explainer(explainer_folder_path: str) -> AnchorTabular:
"""
Carica un AnchorTabular explainer salvato con dill da una cartella specifica,
includendo anche il ripristino del sampler.
:param explainer_folder_path: path alla cartella /output_model/explainer_model
:return: istanza di AnchorTabular già fitata e completa
"""
explainer_file = Path(explainer_folder_path) / 'explainer.dill'
sampler_file = Path(explainer_folder_path) / 'samplers.dill'
discretizer_file = Path(explainer_folder_path) / 'discretizer.dill'
if not explainer_file.exists():
raise FileNotFoundError(f"Explainer file not found at {explainer_file}")
with open(explainer_file, 'rb') as f:
explainer: AnchorTabular = dill.load(f)
# Ripristino dei samplers separatamente
if discretizer_file.exists():
if sampler_file.exists():
with open(discretizer_file, 'rb') as f:
discretizer = dill.load(f)
with open(sampler_file, 'rb') as f:
samplers = dill.load(f)
samplers.disc = discretizer
explainer.samplers = [samplers]
print("✅ Samplers ricaricati correttamente dall'explainer.", flush=True)
else:
print("âš Attenzione: samplers.dill non trovato. L'explainer potrebbe non funzionare correttamente.", flush=True)
return explainer
def extract_imputer_values_from_explainer(explainer) -> dict:
"""
Estrae un dizionario unico {feature_name: fill_value} coerente con il training.
- Valore centrale di qts per feature numeriche (dal Discretizer)
- Primo valore disponibile per categoriche (da feature_values)
Returns:
Dict[str, Any]: mapping {column_name: fill_value}
"""
sampler = explainer.samplers[0]
discretizer = sampler.disc
feature_names = explainer.feature_names
fill_values = {}
# Valore centrale (approssimazione della mediana) per numeriche
for i in explainer.numerical_features:
if i in discretizer.lambdas and 'qts' in discretizer.lambdas[i].keywords:
qts = discretizer.lambdas[i].keywords['qts']
if len(qts) > 0:
fill_values[feature_names[i]] = qts[len(qts) // 2]
# Primo valore noto per categoriche
for i in sampler.categorical_features:
if i in sampler.feature_values and len(sampler.feature_values[i]) > 0:
fill_values[feature_names[i]] = sampler.feature_values[i][0]
return fill_values
def get_anchors_results_with_prediction(sample_to_explain, fitted_anchors: AnchorTabular, explain_kwargs, target_class: str):
try:
encoded_instance = encode_categorical_features(sample_to_explain, fitted_anchors)
explanation = fitted_anchors.explain(encoded_instance,**explain_kwargs)
anch_explanation = explanation['data']
except Exception as e:
print('While getting anchors results, an error occured:', flush=True)
print(e, flush=True)
anch_explanation = None
try:
prediction = fitted_anchors.predictor(sample_to_explain.reshape(1,-1))[0]
except Exception as e:
print('While getting prediction, an error occured:', flush=True)
print(e, flush=True)
prediction = None
anchor_text = ' AND '.join(anch_explanation.get('anchor', [])) if anch_explanation else ''
return {
'name': target_class,
'value': str(prediction),
'explanation': anchor_text
}
import pandas as pd
from arguments import args
import pickle
import joblib
from utils import load_first_from_minio, minio_ls
from utils import *
from kafka import KafkaConsumer, KafkaProducer
model_path_str = load_first_from_minio(args.input_model_minio_url, args.input_model_access_key, args.input_model_secret_key, args.input_model_minio_bucket, args.input_model, ".joblib")
model_to_explain = None
try:
model_to_explain = joblib.load(model_path_str)
except Exception:
try:
with open(model_to_explain, "rb") as f:
model_to_explain = pickle.load(f)
except Exception as e:
raise RuntimeError(f"Failed to load model {model_path_str}: {e}")
try:
features_name_for_model = model_to_explain.feature_names_in_.tolist()
except AttributeError:
features_name_for_model = None
predict_fn = build_predict_fn(model_to_explain)
explainer_remote_path = '.resources/input_model'
os.makedirs(explainer_remote_path, exist_ok=True)
download_folder_from_minio(args.input_model_minio_url, args.input_model_access_key, args.input_model_secret_key, args.input_model_minio_bucket, args.input_model, explainer_remote_path, secure=False)
explainer_path = find_folder(explainer_remote_path, 'explainer_model')
explainer = load_anchor_explainer(explainer_path)
# Verificare se è ancora necessario questo passaggio
explainer.reset_predictor(predict_fn)
# Carico le informazioni per imputare i valori mancanti coerentemente all'explainer
impute_dict = extract_imputer_values_from_explainer(explainer)
direct_args_to_explainer_function = json.loads(args.direct_args_to_explainer_function)
if direct_args_to_explainer_function is None: direct_args_to_explainer_function = {}
if 'threshold' not in direct_args_to_explainer_function: direct_args_to_explainer_function['threshold'] = 0.7
if args.feature_to_predict is None: args.feature_to_predict = 'predicted_label'
consumer = KafkaConsumer(
args.input_topic,
bootstrap_servers=args.input_kafka_brokers.split(",")
)
producer = KafkaProducer(
bootstrap_servers=args.output_kafka_brokers.split(",")
)
for message in consumer.read_message():
if isinstance(message, dict):
records = [message]
elif isinstance(message, list) and all(isinstance(el, dict) for el in message):
records = message
else:
print("⚠Messaggio non valido (né dict né lista di dict), ignorato.")
continue
df_input = pd.DataFrame(records)
# Imposta il valore nullo per feature non arrivate nel messaggio ma necessarie per il modello
if features_name_for_model is not None:
missing_features = [col for col in features_name_for_model if col not in df_input.columns]
for col in missing_features:
df_input[col] = np.nan
df_input = df_input[features_name_for_model]
# imputazione valori mancanti
df_input.fillna(impute_dict, inplace=True)
# Iterazione riga per riga per prediction + explanation
for (i), row in df_input.iterrows():
input_array = row.to_numpy(dtype=object)
try:
anchor_explain_dict = get_anchors_results_with_prediction(
sample_to_explain=input_array,
fitted_anchors=explainer,
explain_kwargs=args.direct_args_to_explainer_function,
target_class=args.feature_to_predict
)
# Inserimento in record originale SENZA toccare altre chiavi
record = records[i]
record.setdefault('predictions', []).append(anchor_explain_dict)
except Exception as e:
print('Error of elaboration messages:', flush=True)
print(e, flush=True)
producer.send(args.output_topic, records)
Metamodel
{
"accessLevel": "PUBLIC",
"area": "ANALYSIS",
"description": "Anchors explainer for tabular data",
"framework": {
"id": 6,
"imageUrl": "https://cdn.alidalab.it/static/images/frameworks/python_logo.png",
"name": "Python",
"version": "3"
},
"mode": "BATCH",
"name": "anchors-tabular-predict",
"properties": [
{
"defaultValue": null,
"description": "The feature to predict. Use for multiple pretiction",
"invisible": false,
"key": "feature_to_predict",
"mandatory": false,
"type": "application",
"valueType": "STRING"
},
{
"defaultValue": null,
"description": "These arguments will be injected directly into the explain method of the Anchor Tabular explainer.",
"invisible": false,
"key": "direct_args_to_explainer_function",
"mandatory": false,
"type": "application",
"valueType": "JSON"
},
{
"defaultValue": null,
"description": "Your input model description.",
"inputData": false,
"invisible": true,
"key": "input-model",
"mandatory": true,
"type": "application",
"valueType": "STRING"
},
{
"defaultValue": null,
"description": "Data with prediction and explanations",
"extra": {
"datasetType": null,
"mode": "streaming"
},
"invisible": true,
"key": "output-dataset",
"mandatory": true,
"type": "application",
"valueType": "STRING"
},
{
"defaultValue": null,
"description": "Data to be explained.",
"extra": {
"datasetType": null,
"mode": "streaming"
},
"invisible": true,
"key": "input-dataset",
"mandatory": true,
"type": "application",
"valueType": "STRING"
}
],
"url": "docker://dockerhub.alidalab.it/alida/restricted/services/anchors-tabular-predict:1.1.0",
"version": "1.1.0"
}
Es. 6: Input Dataset Multipli + Output Dataset Multiple
Diagramma
Codice Programma Nucleo
import argparse
from minio import Minio
from arguments import args
from utils import upload_dataset, fetch_dataset
import pandas as pd
parser = argparse.ArgumentParser()
parser.add_argument('--input-dataset-1', dest='input_dataset_1', type=str, required=True)
parser.add_argument('--input-dataset-1.minio_bucket', dest='input_dataset_1_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset-1.minIO_URL', dest='input_dataset_1_minio_url', type=str, required=True)
parser.add_argument('--input-dataset-1.minIO_ACCESS_KEY', dest='input_dataset_1_access_key', type=str, required=True)
parser.add_argument('--input-dataset-1.minio_bucket.minIO_SECRET_KEY', dest='input_dataset_1_secret_key', type=str, required=True)
parser.add_argument('--output-dataset-1', dest='output_dataset_1_dataset', type=str, required=True)
parser.add_argument('--output-dataset-1.minio_bucket', dest='output_dataset_1_minio_bucket', type=str, required=True)
parser.add_argument('--output-dataset-1.minIO_URL', dest='output_dataset_1_minio_url', type=str, required=True)
parser.add_argument('--output-dataset-1.minIO_ACCESS_KEY', dest='output_dataset_1_access_key', type=str, required=True)
parser.add_argument('--output-dataset-1.minIO_SECRET_KEY', dest='output_dataset_1_secret_key', type=str, required=True)
parser.add_argument('--input-dataset-2', dest='input_dataset_2', type=str, required=True)
parser.add_argument('--input-dataset-2.minio_bucket', dest='input_dataset_2_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset-2.minIO_URL', dest='input_dataset_2_minio_url', type=str, required=True)
parser.add_argument('--input-dataset-2.minIO_ACCESS_KEY', dest='input_dataset_2_access_key', type=str, required=True)
parser.add_argument('--input-dataset-2.minio_bucket.minIO_SECRET_KEY', dest='input_dataset_2_secret_key', type=str, required=True)
parser.add_argument('--output-dataset-2', dest='output_dataset_2_dataset', type=str, required=True)
parser.add_argument('--output-dataset-2.minio_bucket', dest='output_dataset_2_minio_bucket', type=str, required=True)
parser.add_argument('--output-dataset-2.minIO_URL', dest='output_dataset_2_minio_url', type=str, required=True)
parser.add_argument('--output-dataset-2.minIO_ACCESS_KEY', dest='output_dataset_2_access_key', type=str, required=True)
parser.add_argument('--output-dataset-2.minIO_SECRET_KEY', dest='output_dataset_2_secret_key', type=str, required=True)
parser.add_argument('--input-dataset-3', dest='input_dataset_3', type=str, required=True)
parser.add_argument('--input-dataset-3.minio_bucket', dest='input_dataset_3_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset-3.minIO_URL', dest='input_dataset_3_minio_url', type=str, required=True)
parser.add_argument('--input-dataset-3.minIO_ACCESS_KEY', dest='input_dataset_3_access_key', type=str, required=True)
parser.add_argument('--input-dataset-3.minio_bucket.minIO_SECRET_KEY', dest='input_dataset_3_secret_key', type=str, required=True)
parser.add_argument('--output-dataset-3', dest='output_dataset_3_dataset', type=str, required=True)
parser.add_argument('--output-dataset-3.minio_bucket', dest='output_dataset_3_minio_bucket', type=str, required=True)
parser.add_argument('--output-dataset-3.minIO_URL', dest='output_dataset_3_minio_url', type=str, required=True)
parser.add_argument('--output-dataset-3.minIO_ACCESS_KEY', dest='output_dataset_3_access_key', type=str, required=True)
parser.add_argument('--output-dataset-3.minIO_SECRET_KEY', dest='output_dataset_3_secret_key', type=str, required=True)
args, unknown = parser.parse_known_args()
def minio_ls(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
if folder[-1] != "/":
folder = folder + "/"
client = Minio(
address,
access_key=access_key,
secret_key=secret_key,
secure=use_ssl
)
objects = client.list_objects(bucket_name=bucket_name, prefix=folder)
files_list = [x._object_name for x in objects if extention in x._object_name[-len(extention):]]
if len(files_list) > 1:
return ["s3://" + bucket_name + "/" + x for x in files_list]
elif len(files_list) == 1:
return "s3://" + bucket_name + "/" + files_list[0]
else:
raise Exception("Dataset is empty!")
def fetch_dataset(dataset, minio_url, access_key, secret_key, bucket_name, extention):
storage_options = {
'key': access_key,
'secret': secret_key,
'client_kwargs': {
'endpoint_url': f'{minio_url}'
}
}
file_path = minio_ls(minio_url, access_key, secret_key, bucket_name, dataset, extention)
input_dataset = pd.read_csv(file_path, storage_options=storage_options, sep = None, engine = 'python')
return input_dataset
def upload_dataset(df, path, minio_url, access_key, secret_key, bucket_name):
storage_options = {
'key': access_key,
'secret': secret_key,
'client_kwargs': {
'endpoint_url': f'{minio_url}'
}
}
file_path = f"s3://{bucket_name}/{path}/dataset.csv"
df.to_csv(file_path,
storage_options=storage_options,
index=False
)
input_dataset_1 = fetch_dataset(args.input_dataset_1_minio_url, args.input_dataset_1_access_key, args.input_dataset_1_secret_key, args.input_dataset_1_minio_bucket, args.input_dataset_1_dataset, ".csv")
input_dataset_2 = fetch_dataset(args.input_dataset_2_minio_url, args.input_dataset_2_access_key, args.input_dataset_2_secret_key, args.input_dataset_2_minio_bucket, args.input_dataset_2_dataset, ".csv")
input_dataset_3 = fetch_dataset(args.input_dataset_3_minio_url, args.input_dataset_3_access_key, args.input_dataset_3_secret_key, args.input_dataset_3_minio_bucket, args.input_dataset_3_dataset, ".csv")
# Add the "Status" column with "Updated" in every row
input_dataset_1['Status'] = 'Updated 1'
input_dataset_2['Status'] = 'Updated 2'
input_dataset_3['Status'] = 'Updated 3'
upload_dataset(input_dataset_1, args.output_dataset_1_dataset, args.output_dataset_1_minio_url, args.output_dataset_1_access_key, args.output_dataset_1_secret_key, args.output_dataset_1_minio_bucket)
upload_dataset(input_dataset_2, args.output_dataset_2_dataset, args.output_dataset_2_minio_url, args.output_dataset_2_access_key, args.output_dataset_2_secret_key, args.output_dataset_2_minio_bucket)
upload_dataset(input_dataset_3, args.output_dataset_3_dataset, args.output_dataset_3_minio_url, args.output_dataset_3_access_key, args.output_dataset_3_secret_key, args.output_dataset_3_minio_bucket)
Metamodel
{
"name": "multiple-io-example",
"description": "Multiple IO example",
"mode": "BATCH",
"area": "ANALYSIS",
"url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/multiple-io-example:1.0.0",
"version": "1.0.0",
"accessLevel": "PUBLIC",
"framework": {
"id": 6,
"name": "Python",
"version": "3",
"imageUrl": "https://cdn.alidalab.it/static/images/frameworks/python_logo.png"
},
"properties": [
{
"description": "Your input dataset 1",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "input-dataset-1",
"valueType": "STRING",
"invisible": true
},
{
"defaultValue": "ANY",
"description": "Selected columns from table",
"key": "input-columns-1",
"type": "application",
"mandatory": true,
"valueType": "STRING",
"invisible": true
},
{
"description": "Your input dataset 2",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "input-dataset-2",
"valueType": "STRING",
"invisible": true
},
{
"defaultValue": "ANY",
"description": "Selected columns from table",
"key": "input-columns-2",
"type": "application",
"mandatory": true,
"valueType": "STRING",
"invisible": true
},
{
"description": "Your input dataset 3",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "input-dataset-3",
"valueType": "STRING",
"invisible": true
},
{
"defaultValue": "ANY",
"description": "Selected columns from table",
"key": "input-columns-3",
"type": "application",
"mandatory": true,
"valueType": "STRING",
"invisible": true
},
{
"description": "Your output dataset 1",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "output-dataset-1",
"valueType": "STRING",
"invisible": true
},
{
"description": "Your output dataset 2",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "output-dataset-2",
"valueType": "STRING",
"invisible": true
},
{
"description": "Your output dataset 3",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "output-dataset-3",
"valueType": "STRING",
"invisible": true
}
]
}
Es. 7: Input Dataset + Output Model + Workflow Media
Diagramma

Codice Programma Nucleo
import argparse
import os
from minio import Minio
from arguments import args
from kafka import KafkaProducer
import pandas as pd
from arguments import args
from sklearn.neighbors import KNeighborsClassifier
from joblib import dump, load
import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay
from utilities import minio_ls, prepare_file_metadata, upload_file_to_minio
from producer import send_message
parser = argparse.ArgumentParser()
parser.add_argument('--input-dataset', dest='input_dataset', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_dataset_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_dataset_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket.minIO_SECRET_KEY', dest='input_dataset_secret_key', type=str, required=True)
parser.add_argument('--output-model', dest='output_model', type=str, required=True)
parser.add_argument('--output-model.minio_bucket', dest='output_model_minio_bucket', type=str, required=True)
parser.add_argument('--output-model.minIO_URL', dest='output_model_minio_url', type=str, required=True)
parser.add_argument('--output-model.minIO_ACCESS_KEY', dest='output_model_access_key', type=str, required=True)
parser.add_argument('--output-model.minio_bucket.minIO_SECRET_KEY', dest='output_model_secret_key', type=str, required=True)
parser.add_argument('--go_manager.brokers', dest='go_manager_brokers', type=str, required=True)
parser.add_argument('--go_manager_topic', dest='go_manager_topic', type=str, required=True)
parser.add_argument('--go_manager', dest='minio_path', type=str, required=True)
parser.add_argument('--go_manager.minio_bucket', dest='minio_bucket', type=str, required=True)
parser.add_argument('--go_manager.minIO_URL', dest='minio_url', type=str, required=True)
parser.add_argument('--go_manager.minIO_ACCESS_KEY', dest='minio_access_key', type=str, required=True)
parser.add_argument('--go_manager.minIO_SECRET_KEY', dest='minio_secret_key', type=str, required=True)
parser.add_argument("--labelCol", dest='labelCol'. help="Name of the column containing the label", type=str, required=True)
parser.add_argument("--direct_args_to_sklearn_function", dest='direct_args_to_sklearn_function'. help="This args are going to be injected to the KNeighborsClassifier.fit function directly", type=str, default='{"n_neighbors":3}')
args, unknown = parser.parse_known_args()
def minio_ls(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
if folder[-1] != "/":
folder = folder + "/"
client = Minio(
address,
access_key=access_key,
secret_key=secret_key,
secure=use_ssl
)
objects = client.list_objects(bucket_name=bucket_name, prefix=folder)
files_list = [x._object_name for x in objects if extention in x._object_name[-len(extention):]]
if len(files_list) > 1:
return ["s3://" + bucket_name + "/" + x for x in files_list]
elif len(files_list) == 1:
return "s3://" + bucket_name + "/" + files_list[0]
else:
raise Exception("Dataset is empty!")
def upload_file_to_minio(minio_url, access_key, secret_key, bucket_name, object_name, local_file_path):
# Initialize the MinIO client
client = Minio(minio_url, access_key=access_key, secret_key=secret_key, secure=False)
# Upload the file
client.fput_object(bucket_name, object_name, local_file_path)
def prepare_metadata_json(name, messageType, title=None, description=None, var=None, show=True):
result = {
"name": name,
"key": name.lower().replace(" ", "-"),
"uuid": str(uuid.uuid4()),
"messageType": messageType,
"title": title,
"description": description,
"var": var,
"created": str(datetime.now()),
"modified": str(datetime.now()),
"show": show
}
if "BDA_ID" in os.environ:
result['bdaId'] = os.environ.get('BDA_ID')
if "SERVICE_ID" in os.environ:
result['serviceId'] = os.environ.get('SERVICE_ID')
if "ORGANIZATION_ID" in os.environ:
result['organizationId'] = os.environ.get('ORGANIZATION_ID')
if "OWNER_ID" in os.environ:
result['ownerId'] = os.environ.get('OWNER_ID')
if "EXECUTION_ID" in os.environ:
result['executionId'] = os.environ.get('EXECUTION_ID')
if "ACCESS_LEVEL" in os.environ:
result['accessLevel'] = os.environ.get('ACCESS_LEVEL')
if "EXECUTOR_ID" in os.environ:
result['executorId'] = os.environ.get('EXECUTOR_ID')
if "EXECUTOR_NAME" in os.environ:
result['executorName'] = os.environ.get('EXECUTOR_NAME')
if "EXECUTOR_ORG_ID" in os.environ:
result['executorOrgId'] = os.environ.get('EXECUTOR_ORG_ID')
if "EXECUTOR_ORG_NAME" in os.environ:
result['executorOrgName'] = os.environ.get('EXECUTOR_ORG_NAME')
return result
def prepare_file_metadata(localPath, path, extension, filename):
metadata = prepare_metadata_json(name="validation_report.json", messageType="file")
metadata['localPath'] = localPath
metadata['path'] = path
metadata['extension'] = extension
metadata['filename'] = filename
return metadata
producer = KafkaProducer(bootstrap_servers=args.go_manager_brokers.split(","))
def send_message(data):
producer.send(args.go_manager_topic, json.dumps(data).encode('utf-8'))
producer.flush()
# Load the input dataset
storage_options = {
'key': args.input_access_key,
'secret': args.input_secret_key,
'client_kwargs': {
'endpoint_url': f'{args.input_minio_url}'
}
}
file_path = minio_ls(args.input_dataset_minio_url, args.input_dataset_access_key, args.input_dataset_secret_key, args.input_dataset_minio_bucket, args.input_dataset, ".csv")
dataset = pd.read_csv(file_path, storage_options=storage_options)
y = dataset[args.labelCol]
X = dataset.drop(args.labelCol, axis=1)
neigh = KNeighborsClassifier(json.loads(args.direct_args_to_sklearn_function))
neigh.fit(X=X, y=y)
# Saves plot in local path
disp = ConfusionMatrixDisplay.from_estimator(neigh, X, y)
disp.plot()
disp.ax_.set_title("Confusion Matrix")
plt.savefig("conf_matrix.png")
metadata = prepare_file_metadata("conf_matrix.png", "conf_matrix.png", "picture")
#Send plot to minIo
upload_file_to_minio(args.minio_url, args.minio_access_key, args.minio_secret_key, args.minio_bucket_name, args.minio_path, 'conf_matrix.png')
send_message(metadata)
# Save the trained model locally as a .joblib fil
model_path = "./temp"
os.makedirs(model_path, exist_ok=True)
dump(neigh, os.path.join(model_path, 'model.joblib'))
# Upload to minIo
upload_file_to_minio(args.output_model_minio_url, args.output_model_access_key, args.output_model_secret_key, args.output_model_minio_bucket, args.output_model, model_path)
Metamodel
{
"name": "knn-fit",
"description": "Knn model training",
"mode": "BATCH",
"area": "ANALYSIS",
"url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/knn-fit:1.0.0",
"version": "1.0.0",
"accessLevel": "PUBLIC",
"framework": {
"id": 6,
"name": "Python",
"version": "3",
"imageUrl": "https://cdn.alidalab.it/static/images/frameworks/python_logo.png"
},
"properties": [
{
"description": "Your input dataset description.",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "input-dataset",
"valueType": "STRING",
"invisible": true
},
{
"defaultValue": "ANY",
"description": "Selected columns from table",
"key": "input-columns",
"type": "application",
"mandatory": true,
"valueType": "STRING",
"invisible": true
},
{
"description": "Your model description",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "output-model",
"valueType": "STRING",
"invisible": true
},
{
"description": "Name of the column containing the label",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "labelCol",
"valueType": "STRING"
},
{
"description": "This args are going to be injected to the KNeighborsClassifier.fit function directly",
"mandatory": true,
"type": "application",
"defaultValue": "{\"n_neighbors\":3}",
"key": "direct_args_to_sklearn_function",
"valueType": "STRING"
}
]
}
Es. 8: Generic Assets
Diagramma

Codice Programma Nucleo
package main
import (
"encoding/csv"
"fmt"
"os"
"path/filepath"
)
func appendHelloWorld(inputDir, outputDir string) {
// Trova i file CSV nella directory di input
files, err := filepath.Glob(filepath.Join(inputDir, "*.csv"))
if err != nil {
fmt.Println("Errore nella ricerca dei file:", err)
return
}
if len(files) == 0 {
fmt.Println("Nessun file CSV trovato nella directory di input")
return
}
// Usa il primo file trovato
inputFile := files[0]
outputFile := filepath.Join(outputDir, "output.csv")
// Apri il file di input
file, err := os.Open(inputFile)
if err != nil {
fmt.Println("Errore nell'apertura del file:", err)
return
}
defer file.Close()
reader := csv.NewReader(file)
rows, err := reader.ReadAll()
if err != nil {
fmt.Println("Errore nella lettura del file:", err)
return
}
// Aggiungi una nuova riga con "hello world"
rows = append(rows, []string{"hello world"})
// Crea la directory di output se non esiste
if err := os.MkdirAll(outputDir, os.ModePerm); err != nil {
fmt.Println("Errore nella creazione della directory di output:", err)
return
}
// Scrive il file CSV modificato
outFile, err := os.Create(outputFile)
if err != nil {
fmt.Println("Errore nella creazione del file di output:", err)
return
}
defer outFile.Close()
writer := csv.NewWriter(outFile)
err = writer.WriteAll(rows)
if err != nil {
fmt.Println("Errore nella scrittura del file CSV:", err)
return
}
fmt.Println("File salvato:", outputFile)
}
func main() {
inputDir := "/inputs"
outputDir := "/outputs"
appendHelloWorld(inputDir, outputDir)
}
Metamodel
{
"name": "generic-go-ex1",
"version": "1.0.0",
"accessLevel": "PUBLIC",
"area": "PREPARATION",
"description": "",
"mode": "BATCH",
"properties": [
{
"defaultValue": "{\"inputs\": [{\"name\": \"input-data\", \"path\": \"/inputs\", \"type\": \"dataset\",\"description\": \"my description\"}], \"outputs\": [{\"name\": \"output-data\", \"path\": \"/outputs\", \"type\": \"dataset\",\"description\": \"my description\"}]}",
"description": "",
"externalized": false,
"extra": null,
"invisible": true,
"key": "assets",
"mandatory": true,
"type": "application",
"uri": null,
"value": null,
"valueType": "JSON"
}
],
"ref": null,
"statusModel": null,
"tags": [],
"url": "docker://gitlab.alidalab.it:5000/alida/analytics2/example-go-1:latest"
}