Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 137 additions & 40 deletions lab_server/lab_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@
from machines import HumanPlateServer, TecanFluent480, OpentronsOT2, TecanInfinite200Pro, HumanStoreLabware
from util import calculate_md5
from lib_operator import Operator
from storage_service import StorageService, get_storage
from time import sleep
from random import uniform
# from lib_operator import Operator
# from .operator import Operator
import yaml
import requests
import random
import os

LOG_SERVER_URL = 'http://log_server:8000'

# ストレージサービスの初期化(シングルトン)
# 環境変数STORAGE_MODEで 's3' または 'local' を指定
storage = get_storage()
print(f"Storage service initialized: mode={storage.mode}")

app = FastAPI()


Expand All @@ -46,6 +53,7 @@ class Operation:
finished_at: str | None
status: str
storage_address: str
run_storage_address: str # Runのstorage_address(親パス)
is_transport: bool
is_data: bool

Expand All @@ -56,7 +64,8 @@ def __init__(
name,
storage_address,
is_transport,
is_data
is_data,
run_storage_address: str = ""
):
self.process_db_id = process_db_id
self.process_name = process_name
Expand All @@ -65,6 +74,7 @@ def __init__(
self.finished_at = None
self.status = "not started"
self.storage_address = storage_address
self.run_storage_address = run_storage_address
self.is_transport = is_transport
self.is_data = is_data

Expand Down Expand Up @@ -97,7 +107,8 @@ def post(self):
)

self.db_id = response_data["id"]
self.storage_address = f"/storage/operations/{self.db_id}"
# storage_addressを統一形式に変更: runs/{run_id}/operations/{op_id}/
self.storage_address = f"{self.run_storage_address}operations/{self.db_id}/"
requests.patch(
url=f'{LOG_SERVER_URL}/api/operations/{self.db_id}',
data={
Expand All @@ -107,11 +118,9 @@ def post(self):
)

def run(self):

self.started_at = datetime.now().isoformat()
self.status = "running"
storage_path = Path(self.storage_address)
storage_path.mkdir(parents=True, exist_ok=True)

requests.patch(
url=f'{LOG_SERVER_URL}/api/operations/{self.db_id}',
data={
Expand All @@ -126,21 +135,31 @@ def run(self):
"new_value": self.status
}
)

# シミュレーション: ランダムな実行時間
running_time = uniform(1, 3)
sleep(running_time)

self.finished_at = datetime.now().isoformat()
self.status = "completed"
with open(storage_path / "log.txt", "w") as f:
f.write(f"Operation {self.name} completed at {self.finished_at}")
with open(storage_path / "log.txt", "r") as f:
log = f.read()
requests.patch(
url=f'{LOG_SERVER_URL}/api/operations/{self.db_id}',
data={
"attribute": "log",
"new_value": log
}
)

# ログ内容を生成
log_content = f"Operation {self.name} completed at {self.finished_at}"

# StorageServiceを使用してログを保存(S3またはローカル)
log_path = f"{self.storage_address}log.txt"
storage.save(log_path, log_content.encode('utf-8'), content_type='text/plain')
print(f"Operation log saved: {log_path}")

# DBにもログを保存(既存の動作を維持)
requests.patch(
url=f'{LOG_SERVER_URL}/api/operations/{self.db_id}',
data={
"attribute": "log",
"new_value": log_content
}
)

requests.patch(
url=f'{LOG_SERVER_URL}/api/operations/{self.db_id}',
data={
Expand All @@ -163,12 +182,14 @@ class Process:
type: str
id_in_protocol: str
storage_address: str
run_storage_address: str # Runのstorage_address(親パス)

def __init__(self, run_id, type, id_in_protocol, storage_address):
def __init__(self, run_id, type, id_in_protocol, storage_address, run_storage_address: str = ""):
self.run_id = run_id
self.type = type
self.id_in_protocol = id_in_protocol
self.storage_address = storage_address
self.run_storage_address = run_storage_address

def post(self):
response = requests.post(
Expand Down Expand Up @@ -196,7 +217,8 @@ def post(self):
)

self.db_id = response_data["id"]
self.storage_address = f"/storage/processes/{self.db_id}"
# storage_addressを統一形式に変更: runs/{run_id}/processes/{process_id}/
self.storage_address = f"{self.run_storage_address}processes/{self.db_id}/"
requests.patch(
url=f'{LOG_SERVER_URL}/api/processes/{self.db_id}',
data={
Expand All @@ -211,24 +233,26 @@ def operation_mapping(self, machines: List[Operator]) -> Operation:
process_db_id=self.db_id,
process_name=self.id_in_protocol,
name=self.id_in_protocol,
storage_address='storage/operation',
storage_address='', # post()で設定される
is_transport=False,
is_data=False
is_data=False,
run_storage_address=self.run_storage_address
)
return operation
suit_machine = random.choice([machine for machine in machines if machine.type == self.type])
operation = Operation(
process_db_id=self.db_id,
process_name=self.id_in_protocol,
name=suit_machine.id,
storage_address='storage/operation',
storage_address='', # post()で設定される
is_transport=False,
is_data=False
is_data=False,
run_storage_address=self.run_storage_address
)
return operation


def connection_to_operation(connection_list: List[Connection], process_list: List[Process], operation_list: List[Operation]):
def connection_to_operation(connection_list: List[Connection], process_list: List[Process], operation_list: List[Operation], run_storage_address: str = ""):
connections = [{
"input_source": connection['input'][0],
"input_content": connection['input'][1],
Expand All @@ -244,9 +268,10 @@ def connection_to_operation(connection_list: List[Connection], process_list: Lis
process_db_id=source_process.db_id,
process_name=source_process.id_in_protocol,
name=f"{connection['input_source']}_{connection['input_content']}_{connection['output_source']}_{connection['output_content']}",
storage_address='storage/operation',
storage_address='', # post()で設定される
is_transport=True,
is_data=connection["is_data"]
is_data=connection["is_data"],
run_storage_address=run_storage_address
)
operation_name_from = [operation.name for operation in operation_list if operation.process_name == connection['input_source']][0]
operation_name_to = [operation.name for operation in operation_list if operation.process_name == connection['output_source']][0]
Expand All @@ -260,7 +285,7 @@ def connection_to_operation(connection_list: List[Connection], process_list: Lis
return operation_list_from_connection, edge_list


def create_process_and_operation_and_edge(run_id, protocol_dict, machines):
def create_process_and_operation_and_edge(run_id, protocol_dict, machines, run_storage_address: str = ""):
processes = protocol_dict["operations"]
connections = protocol_dict["connections"]

Expand All @@ -269,28 +294,31 @@ def create_process_and_operation_and_edge(run_id, protocol_dict, machines):
run_id=run_id,
type=process["type"],
id_in_protocol=process["id"],
storage_address=f'process/{process["id"]}'
storage_address='', # post()で設定される
run_storage_address=run_storage_address
) for process in processes
]

input_process = Process(
run_id=run_id,
type="input",
id_in_protocol="input",
storage_address=""
storage_address="",
run_storage_address=run_storage_address
)
output_process = Process(
run_id=run_id,
type="output",
id_in_protocol="output",
storage_address=""
storage_address="",
run_storage_address=run_storage_address
)

process_list += [input_process, output_process]
[process.post() for process in process_list]

operation_list = [process.operation_mapping(machines=machines) for process in process_list]
operation_list_from_connection, edge_list = connection_to_operation(connections, process_list, operation_list)
operation_list_from_connection, edge_list = connection_to_operation(connections, process_list, operation_list, run_storage_address)
operation_list += operation_list_from_connection
[operation.post() for operation in operation_list]

Expand Down Expand Up @@ -369,28 +397,61 @@ async def calc_md5_from_file(file: UploadFile = File(...)):
return md5


def upload_file(content: bytes, path: str, content_type: str = 'text/plain') -> bool:
"""
ファイルをストレージにアップロード(StorageService経由)

Args:
content: アップロードするファイルの内容(bytes)
path: ストレージパス(相対パス形式)
content_type: コンテントタイプ

Returns:
成功時True、失敗時False
"""
result = storage.save(path, content, content_type=content_type)
if result:
print(f"File upload success: {path}")
else:
print(f"File upload failed: {path}")
return result


async def upload_yaml_file(file: UploadFile, storage_address: str, filename: str) -> bool:
"""
UploadFileからYAMLファイルをストレージにアップロード

Args:
file: アップロードされたファイル
storage_address: ストレージのベースパス(例: runs/1/)
filename: 保存するファイル名

Returns:
成功時True、失敗時False
"""
await file.seek(0)
content = await file.read()
await file.seek(0)

path = f"{storage_address}{filename}"
return upload_file(content, path, content_type='application/x-yaml')


@app.post("/run_experiment")
async def run_experiment(project_id: int, protocol_name, user_id: int, protocol_yaml: UploadFile = File(...), manipulate_yaml: UploadFile = File(...)):
protocol_md5 = await calc_md5_from_file(protocol_yaml)
protocol = await read_uploaded_yaml(protocol_yaml)
manipulates = await read_uploaded_yaml(manipulate_yaml)
storage_address = "https://drive.google.com/drive/folders/18dhaS7ZKYonfebM4oV5raU79CZQdrHJK?usp=sharing"
machines = [
HumanPlateServer("human_plate_server", manipulates, storage_address),
TecanFluent480("tecan_fluent_480", manipulates, storage_address),
OpentronsOT2("opentrons_ot2", manipulates, storage_address),
TecanInfinite200Pro("tecan_infinite_200_pro", manipulates, storage_address),
HumanStoreLabware("human_store_labware", manipulates, storage_address),
]

# 一旦空のstorage_addressでRun作成(run_id取得後に更新)
response = requests.post(
url=f'{LOG_SERVER_URL}/api/runs/',
data={
"project_id": project_id,
"file_name": protocol_name,
"checksum": protocol_md5,
"user_id": user_id,
"storage_address": storage_address
"storage_address": "" # 一旦空で作成
}
)

Expand All @@ -409,10 +470,44 @@ async def run_experiment(project_id: int, protocol_name, user_id: int, protocol_
)

run_id = response_data["id"]

# storage_addressを動的に生成(統一形式: runs/{run_id}/)
run_storage_address = f"runs/{run_id}/"

# storage_addressを更新
update_response = requests.patch(
url=f'{LOG_SERVER_URL}/api/runs/{run_id}',
data={"attribute": "storage_address", "new_value": run_storage_address}
)
if update_response.status_code != 200:
print(f"Warning: Failed to update storage_address for run {run_id}: {update_response.text}")

# storage_modeを更新(現在のストレージモードを記録)
update_mode_response = requests.patch(
url=f'{LOG_SERVER_URL}/api/runs/{run_id}',
data={"attribute": "storage_mode", "new_value": storage.mode}
)
if update_mode_response.status_code != 200:
print(f"Warning: Failed to update storage_mode for run {run_id}: {update_mode_response.text}")

# プロトコルファイルをストレージにアップロード(StorageService経由)
print(f"Uploading YAML files to storage: {run_storage_address}")
await upload_yaml_file(protocol_yaml, run_storage_address, "protocol.yaml")
await upload_yaml_file(manipulate_yaml, run_storage_address, "manipulate.yaml")

# マシン初期化(storage_addressは動的生成された値を使用)
machines = [
HumanPlateServer("human_plate_server", manipulates, run_storage_address),
TecanFluent480("tecan_fluent_480", manipulates, run_storage_address),
OpentronsOT2("opentrons_ot2", manipulates, run_storage_address),
TecanInfinite200Pro("tecan_infinite_200_pro", manipulates, run_storage_address),
HumanStoreLabware("human_store_labware", manipulates, run_storage_address),
]
operation_list, edge_list = create_process_and_operation_and_edge(
run_id=run_id,
protocol_dict=protocol,
machines=machines
machines=machines,
run_storage_address=run_storage_address
)
plan = create_plan(edge_list)
run_start_time = datetime.now().isoformat()
Expand All @@ -424,3 +519,5 @@ async def run_experiment(project_id: int, protocol_name, user_id: int, protocol_
run_finish_time = datetime.now().isoformat()
requests.patch(url=f'{LOG_SERVER_URL}/api/runs/{run_id}', data={"attribute": "finished_at", "new_value": run_finish_time})
requests.patch(url=f'{LOG_SERVER_URL}/api/runs/{run_id}', data={"attribute": "status", "new_value": "completed"})

return {"run_id": run_id, "storage_address": run_storage_address, "status": "completed"}
Loading