diff --git a/lab_server/lab_server.py b/lab_server/lab_server.py index 999db33..9e31e73 100644 --- a/lab_server/lab_server.py +++ b/lab_server/lab_server.py @@ -11,6 +11,7 @@ from machines import HumanPlateServer, TecanFluent480, OpentronsOT2, TecanInfinite200Pro, HumanStoreLabware from util import calculate_md5 from lib_operator import Operator +from storage_service import StorageService, get_storage from time import sleep from random import uniform # from lib_operator import Operator @@ -18,9 +19,15 @@ import yaml import requests import random +import os LOG_SERVER_URL = 'http://log_server:8000' +# ストレージサービスの初期化(シングルトン) +# 環境変数STORAGE_MODEで 's3' または 'local' を指定 +storage = get_storage() +print(f"Storage service initialized: mode={storage.mode}") + app = FastAPI() @@ -46,6 +53,7 @@ class Operation: finished_at: str | None status: str storage_address: str + run_storage_address: str # Runのstorage_address(親パス) is_transport: bool is_data: bool @@ -56,7 +64,8 @@ def __init__( name, storage_address, is_transport, - is_data + is_data, + run_storage_address: str = "" ): self.process_db_id = process_db_id self.process_name = process_name @@ -65,6 +74,7 @@ def __init__( self.finished_at = None self.status = "not started" self.storage_address = storage_address + self.run_storage_address = run_storage_address self.is_transport = is_transport self.is_data = is_data @@ -97,7 +107,8 @@ def post(self): ) self.db_id = response_data["id"] - self.storage_address = f"/storage/operations/{self.db_id}" + # storage_addressを統一形式に変更: runs/{run_id}/operations/{op_id}/ + self.storage_address = f"{self.run_storage_address}operations/{self.db_id}/" requests.patch( url=f'{LOG_SERVER_URL}/api/operations/{self.db_id}', data={ @@ -107,11 +118,9 @@ def post(self): ) def run(self): - self.started_at = datetime.now().isoformat() self.status = "running" - storage_path = Path(self.storage_address) - storage_path.mkdir(parents=True, exist_ok=True) + requests.patch( url=f'{LOG_SERVER_URL}/api/operations/{self.db_id}', data={ @@ -126,21 +135,31 @@ def run(self): "new_value": self.status } ) + + # シミュレーション: ランダムな実行時間 running_time = uniform(1, 3) sleep(running_time) + self.finished_at = datetime.now().isoformat() self.status = "completed" - with open(storage_path / "log.txt", "w") as f: - f.write(f"Operation {self.name} completed at {self.finished_at}") - with open(storage_path / "log.txt", "r") as f: - log = f.read() - requests.patch( - url=f'{LOG_SERVER_URL}/api/operations/{self.db_id}', - data={ - "attribute": "log", - "new_value": log - } - ) + + # ログ内容を生成 + log_content = f"Operation {self.name} completed at {self.finished_at}" + + # StorageServiceを使用してログを保存(S3またはローカル) + log_path = f"{self.storage_address}log.txt" + storage.save(log_path, log_content.encode('utf-8'), content_type='text/plain') + print(f"Operation log saved: {log_path}") + + # DBにもログを保存(既存の動作を維持) + requests.patch( + url=f'{LOG_SERVER_URL}/api/operations/{self.db_id}', + data={ + "attribute": "log", + "new_value": log_content + } + ) + requests.patch( url=f'{LOG_SERVER_URL}/api/operations/{self.db_id}', data={ @@ -163,12 +182,14 @@ class Process: type: str id_in_protocol: str storage_address: str + run_storage_address: str # Runのstorage_address(親パス) - def __init__(self, run_id, type, id_in_protocol, storage_address): + def __init__(self, run_id, type, id_in_protocol, storage_address, run_storage_address: str = ""): self.run_id = run_id self.type = type self.id_in_protocol = id_in_protocol self.storage_address = storage_address + self.run_storage_address = run_storage_address def post(self): response = requests.post( @@ -196,7 +217,8 @@ def post(self): ) self.db_id = response_data["id"] - self.storage_address = f"/storage/processes/{self.db_id}" + # storage_addressを統一形式に変更: runs/{run_id}/processes/{process_id}/ + self.storage_address = f"{self.run_storage_address}processes/{self.db_id}/" requests.patch( url=f'{LOG_SERVER_URL}/api/processes/{self.db_id}', data={ @@ -211,9 +233,10 @@ def operation_mapping(self, machines: List[Operator]) -> Operation: process_db_id=self.db_id, process_name=self.id_in_protocol, name=self.id_in_protocol, - storage_address='storage/operation', + storage_address='', # post()で設定される is_transport=False, - is_data=False + is_data=False, + run_storage_address=self.run_storage_address ) return operation suit_machine = random.choice([machine for machine in machines if machine.type == self.type]) @@ -221,14 +244,15 @@ def operation_mapping(self, machines: List[Operator]) -> Operation: process_db_id=self.db_id, process_name=self.id_in_protocol, name=suit_machine.id, - storage_address='storage/operation', + storage_address='', # post()で設定される is_transport=False, - is_data=False + is_data=False, + run_storage_address=self.run_storage_address ) return operation -def connection_to_operation(connection_list: List[Connection], process_list: List[Process], operation_list: List[Operation]): +def connection_to_operation(connection_list: List[Connection], process_list: List[Process], operation_list: List[Operation], run_storage_address: str = ""): connections = [{ "input_source": connection['input'][0], "input_content": connection['input'][1], @@ -244,9 +268,10 @@ def connection_to_operation(connection_list: List[Connection], process_list: Lis process_db_id=source_process.db_id, process_name=source_process.id_in_protocol, name=f"{connection['input_source']}_{connection['input_content']}_{connection['output_source']}_{connection['output_content']}", - storage_address='storage/operation', + storage_address='', # post()で設定される is_transport=True, - is_data=connection["is_data"] + is_data=connection["is_data"], + run_storage_address=run_storage_address ) operation_name_from = [operation.name for operation in operation_list if operation.process_name == connection['input_source']][0] operation_name_to = [operation.name for operation in operation_list if operation.process_name == connection['output_source']][0] @@ -260,7 +285,7 @@ def connection_to_operation(connection_list: List[Connection], process_list: Lis return operation_list_from_connection, edge_list -def create_process_and_operation_and_edge(run_id, protocol_dict, machines): +def create_process_and_operation_and_edge(run_id, protocol_dict, machines, run_storage_address: str = ""): processes = protocol_dict["operations"] connections = protocol_dict["connections"] @@ -269,7 +294,8 @@ def create_process_and_operation_and_edge(run_id, protocol_dict, machines): run_id=run_id, type=process["type"], id_in_protocol=process["id"], - storage_address=f'process/{process["id"]}' + storage_address='', # post()で設定される + run_storage_address=run_storage_address ) for process in processes ] @@ -277,20 +303,22 @@ def create_process_and_operation_and_edge(run_id, protocol_dict, machines): run_id=run_id, type="input", id_in_protocol="input", - storage_address="" + storage_address="", + run_storage_address=run_storage_address ) output_process = Process( run_id=run_id, type="output", id_in_protocol="output", - storage_address="" + storage_address="", + run_storage_address=run_storage_address ) process_list += [input_process, output_process] [process.post() for process in process_list] operation_list = [process.operation_mapping(machines=machines) for process in process_list] - operation_list_from_connection, edge_list = connection_to_operation(connections, process_list, operation_list) + operation_list_from_connection, edge_list = connection_to_operation(connections, process_list, operation_list, run_storage_address) operation_list += operation_list_from_connection [operation.post() for operation in operation_list] @@ -369,20 +397,53 @@ async def calc_md5_from_file(file: UploadFile = File(...)): return md5 +def upload_file(content: bytes, path: str, content_type: str = 'text/plain') -> bool: + """ + ファイルをストレージにアップロード(StorageService経由) + + Args: + content: アップロードするファイルの内容(bytes) + path: ストレージパス(相対パス形式) + content_type: コンテントタイプ + + Returns: + 成功時True、失敗時False + """ + result = storage.save(path, content, content_type=content_type) + if result: + print(f"File upload success: {path}") + else: + print(f"File upload failed: {path}") + return result + + +async def upload_yaml_file(file: UploadFile, storage_address: str, filename: str) -> bool: + """ + UploadFileからYAMLファイルをストレージにアップロード + + Args: + file: アップロードされたファイル + storage_address: ストレージのベースパス(例: runs/1/) + filename: 保存するファイル名 + + Returns: + 成功時True、失敗時False + """ + await file.seek(0) + content = await file.read() + await file.seek(0) + + path = f"{storage_address}{filename}" + return upload_file(content, path, content_type='application/x-yaml') + + @app.post("/run_experiment") async def run_experiment(project_id: int, protocol_name, user_id: int, protocol_yaml: UploadFile = File(...), manipulate_yaml: UploadFile = File(...)): protocol_md5 = await calc_md5_from_file(protocol_yaml) protocol = await read_uploaded_yaml(protocol_yaml) manipulates = await read_uploaded_yaml(manipulate_yaml) - storage_address = "https://drive.google.com/drive/folders/18dhaS7ZKYonfebM4oV5raU79CZQdrHJK?usp=sharing" - machines = [ - HumanPlateServer("human_plate_server", manipulates, storage_address), - TecanFluent480("tecan_fluent_480", manipulates, storage_address), - OpentronsOT2("opentrons_ot2", manipulates, storage_address), - TecanInfinite200Pro("tecan_infinite_200_pro", manipulates, storage_address), - HumanStoreLabware("human_store_labware", manipulates, storage_address), - ] + # 一旦空のstorage_addressでRun作成(run_id取得後に更新) response = requests.post( url=f'{LOG_SERVER_URL}/api/runs/', data={ @@ -390,7 +451,7 @@ async def run_experiment(project_id: int, protocol_name, user_id: int, protocol_ "file_name": protocol_name, "checksum": protocol_md5, "user_id": user_id, - "storage_address": storage_address + "storage_address": "" # 一旦空で作成 } ) @@ -409,10 +470,44 @@ async def run_experiment(project_id: int, protocol_name, user_id: int, protocol_ ) run_id = response_data["id"] + + # storage_addressを動的に生成(統一形式: runs/{run_id}/) + run_storage_address = f"runs/{run_id}/" + + # storage_addressを更新 + update_response = requests.patch( + url=f'{LOG_SERVER_URL}/api/runs/{run_id}', + data={"attribute": "storage_address", "new_value": run_storage_address} + ) + if update_response.status_code != 200: + print(f"Warning: Failed to update storage_address for run {run_id}: {update_response.text}") + + # storage_modeを更新(現在のストレージモードを記録) + update_mode_response = requests.patch( + url=f'{LOG_SERVER_URL}/api/runs/{run_id}', + data={"attribute": "storage_mode", "new_value": storage.mode} + ) + if update_mode_response.status_code != 200: + print(f"Warning: Failed to update storage_mode for run {run_id}: {update_mode_response.text}") + + # プロトコルファイルをストレージにアップロード(StorageService経由) + print(f"Uploading YAML files to storage: {run_storage_address}") + await upload_yaml_file(protocol_yaml, run_storage_address, "protocol.yaml") + await upload_yaml_file(manipulate_yaml, run_storage_address, "manipulate.yaml") + + # マシン初期化(storage_addressは動的生成された値を使用) + machines = [ + HumanPlateServer("human_plate_server", manipulates, run_storage_address), + TecanFluent480("tecan_fluent_480", manipulates, run_storage_address), + OpentronsOT2("opentrons_ot2", manipulates, run_storage_address), + TecanInfinite200Pro("tecan_infinite_200_pro", manipulates, run_storage_address), + HumanStoreLabware("human_store_labware", manipulates, run_storage_address), + ] operation_list, edge_list = create_process_and_operation_and_edge( run_id=run_id, protocol_dict=protocol, - machines=machines + machines=machines, + run_storage_address=run_storage_address ) plan = create_plan(edge_list) run_start_time = datetime.now().isoformat() @@ -424,3 +519,5 @@ async def run_experiment(project_id: int, protocol_name, user_id: int, protocol_ run_finish_time = datetime.now().isoformat() requests.patch(url=f'{LOG_SERVER_URL}/api/runs/{run_id}', data={"attribute": "finished_at", "new_value": run_finish_time}) requests.patch(url=f'{LOG_SERVER_URL}/api/runs/{run_id}', data={"attribute": "status", "new_value": "completed"}) + + return {"run_id": run_id, "storage_address": run_storage_address, "status": "completed"} diff --git a/lab_server/lib_operator.py b/lab_server/lib_operator.py index d1ce5ed..10f4039 100644 --- a/lab_server/lib_operator.py +++ b/lab_server/lib_operator.py @@ -1,34 +1,72 @@ +"""オペレータークラス + +実験機器(マシン)のシミュレーションを行うクラス。 +StorageServiceを使用してメタデータを保存する。 +""" + from typing import List from time import sleep from random import uniform -from pathlib import Path +import json + +from storage_service import get_storage class Operator: + """実験機器(オペレーター)の基底クラス""" + id: str type: str task_input: List[str] task_output: List[str] - storage_address: Path + storage_address: str # 相対パス形式(例: runs/1/operators/tecan_infinite_200_pro/) + + def __init__(self, id: str, type: str, manipulate_list: List[dict], storage_address: str): + """ + オペレーターを初期化 - def __init__(self, id, type, manipulate_list, storage_address): + Args: + id: オペレーターID(例: tecan_infinite_200_pro) + type: オペレータータイプ + manipulate_list: マニピュレート設定リスト + storage_address: 親のstorage_address(例: runs/1/) + """ self.id = id self.type = type - self.storage_address = storage_address / Path(id) + # storage_addressを統一形式で設定: runs/{run_id}/operators/{operator_id}/ + self.storage_address = f"{storage_address}operators/{id}/" + # 該当するmanipulateが1つしかないことを想定している。 - manipulate = [manipulate for manipulate in manipulate_list if manipulate['name'] == type][0] + manipulate = [m for m in manipulate_list if m['name'] == type][0] if manipulate.get('input'): self.task_input = [input['id'] for input in manipulate['input']] if manipulate.get('output'): self.task_output = [output['id'] for output in manipulate['output']] def run(self): - Path(self.storage_address).mkdir(parents=True) - metadata_path = Path(self.storage_address) / Path('metadata.json') - # ランダムな時間だけ待つ + """ + オペレーターを実行し、メタデータを保存する + + Returns: + str: 実行結果 + """ + storage = get_storage() + + # ランダムな時間だけ待つ(シミュレーション) running_time = uniform(1, 3) sleep(running_time) - # save metadata - with open(metadata_path, 'w') as file: - file.write('{"metadata": "sample_metadata"}') + + # メタデータを生成 + metadata = { + "operator_id": self.id, + "type": self.type, + "status": "completed", + "metadata": "sample_metadata" + } + + # StorageServiceを使用してメタデータを保存 + metadata_path = f"{self.storage_address}metadata.json" + storage.save_json(metadata_path, metadata) + print(f"Operator metadata saved: {metadata_path}") + return "done" diff --git a/lab_server/requirements.txt b/lab_server/requirements.txt index edaa856..0cd9381 100644 --- a/lab_server/requirements.txt +++ b/lab_server/requirements.txt @@ -39,3 +39,4 @@ uvicorn==0.32.0 uvloop==0.21.0 watchfiles==0.24.0 websockets==13.1 +boto3==1.35.66 diff --git a/lab_server/storage_service.py b/lab_server/storage_service.py new file mode 100644 index 0000000..f0ac20d --- /dev/null +++ b/lab_server/storage_service.py @@ -0,0 +1,27 @@ +"""ストレージサービス抽象化レイヤー(後方互換性ラッパー) + +責任分離型設計: labcode-simはWrite専用(StorageWriter) +storage_writer.pyの機能を再エクスポート。 + +使用例: + from storage_service import StorageService, get_storage + + storage = get_storage() + storage.save("runs/1/log.txt", b"Hello World", content_type="text/plain") + storage.save_json("runs/1/metadata.json", {"key": "value"}) +""" + +# storage_writerから再エクスポート(後方互換性維持) +from storage_writer import ( + StorageWriter, + StorageWriter as StorageService, # 後方互換性エイリアス + get_storage_writer, + get_storage_writer as get_storage, # 後方互換性エイリアス +) + +__all__ = [ + 'StorageWriter', + 'StorageService', + 'get_storage_writer', + 'get_storage', +] diff --git a/lab_server/storage_writer.py b/lab_server/storage_writer.py new file mode 100644 index 0000000..9ed1f13 --- /dev/null +++ b/lab_server/storage_writer.py @@ -0,0 +1,174 @@ +"""軽量ストレージライター(Write専用) + +責任分離型設計: labcode-simはWrite操作のみを担当。 +Read + 管理機能はlabcode-log-serverが担当。 + +シンプルなif-else分岐でS3/ローカルを切り替え。 +レジストリパターンは不要(2モードのみのため)。 + +使用例: + from storage_writer import get_storage_writer + + writer = get_storage_writer() + writer.save("runs/1/log.txt", b"Hello World") + writer.save_text("runs/1/config.yaml", "key: value") + writer.save_json("runs/1/metadata.json", {"key": "value"}) +""" + +import os +import json +import logging +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + + +class StorageWriter: + """軽量ストレージライター(Write専用)""" + + def __init__(self): + """ + 環境変数から設定を読み込み、適切なバックエンドを初期化 + + 環境変数: + - STORAGE_MODE: 's3' (デフォルト) または 'local' + - S3_BUCKET_NAME: S3バケット名 + - S3_ENDPOINT_URL: S3エンドポイントURL(オプション) + - AWS_ACCESS_KEY_ID: AWSアクセスキー + - AWS_SECRET_ACCESS_KEY: AWSシークレットキー + - LOCAL_STORAGE_PATH: ローカルストレージパス + """ + self._mode = os.getenv('STORAGE_MODE', 's3').lower() + + if self._mode == 's3': + self._init_s3() + else: + self._init_local() + + logger.info(f"StorageWriter initialized: mode={self._mode}") + + def _init_s3(self): + """S3バックエンドを初期化""" + import boto3 + + client_kwargs = { + 'aws_access_key_id': os.getenv('AWS_ACCESS_KEY_ID'), + 'aws_secret_access_key': os.getenv('AWS_SECRET_ACCESS_KEY'), + 'region_name': os.getenv('AWS_DEFAULT_REGION', 'ap-northeast-1') + } + + endpoint_url = os.getenv('S3_ENDPOINT_URL') + if endpoint_url: + client_kwargs['endpoint_url'] = endpoint_url + + self._s3_client = boto3.client('s3', **client_kwargs) + self._bucket_name = os.getenv('S3_BUCKET_NAME', 'labcode-dev-artifacts') + + def _init_local(self): + """ローカルバックエンドを初期化""" + base_path = os.getenv('LOCAL_STORAGE_PATH', '/data/storage') + self._base_path = Path(base_path) + self._base_path.mkdir(parents=True, exist_ok=True) + + @property + def mode(self) -> str: + """現在のストレージモードを返す ('s3' または 'local')""" + return self._mode + + def save(self, path: str, content: bytes, content_type: str = 'application/octet-stream') -> bool: + """ + ファイルを保存 + + Args: + path: 保存先パス(相対パス形式、例: 'runs/1/log.txt') + content: ファイル内容(バイト列) + content_type: MIMEタイプ + + Returns: + bool: 成功時True + """ + if self._mode == 's3': + return self._save_s3(path, content, content_type) + else: + return self._save_local(path, content) + + def _save_s3(self, path: str, content: bytes, content_type: str) -> bool: + """S3に保存""" + try: + self._s3_client.put_object( + Bucket=self._bucket_name, + Key=path, + Body=content, + ContentType=content_type + ) + logger.debug(f"S3 upload success: {path}") + return True + except Exception as e: + logger.error(f"S3 upload failed: {path} - {e}") + return False + + def _save_local(self, path: str, content: bytes) -> bool: + """ローカルファイルシステムに保存""" + try: + full_path = self._base_path / path + full_path.parent.mkdir(parents=True, exist_ok=True) + with open(full_path, 'wb') as f: + f.write(content) + logger.debug(f"Local save success: {path}") + return True + except Exception as e: + logger.error(f"Local save failed: {path} - {e}") + return False + + def save_text(self, path: str, text: str, encoding: str = 'utf-8') -> bool: + """ + テキストファイルを保存 + + Args: + path: 保存先パス + text: テキスト内容 + encoding: 文字エンコーディング + + Returns: + bool: 成功時True + """ + return self.save(path, text.encode(encoding), content_type='text/plain') + + def save_json(self, path: str, data: dict, indent: int = 2) -> bool: + """ + JSONファイルを保存 + + Args: + path: 保存先パス + data: 辞書データ + indent: インデント幅 + + Returns: + bool: 成功時True + """ + content = json.dumps(data, ensure_ascii=False, indent=indent) + return self.save(path, content.encode('utf-8'), content_type='application/json') + + +# シングルトンインスタンス +_writer_instance: Optional[StorageWriter] = None + + +def get_storage_writer() -> StorageWriter: + """StorageWriterのシングルトンインスタンスを取得""" + global _writer_instance + if _writer_instance is None: + _writer_instance = StorageWriter() + return _writer_instance + + +# 後方互換性のためのエイリアス +# 既存コードで get_storage() を使用している場合のため +def get_storage() -> StorageWriter: + """StorageWriterのシングルトンインスタンスを取得(後方互換性エイリアス)""" + return get_storage_writer() + + +# StorageServiceエイリアス(後方互換性) +StorageService = StorageWriter