123/oar.py.save

452 lines
16 KiB
Text
Raw Normal View History

2025-02-01 07:57:22 +00:00
"""
醫學影像處理程式
功能:執行 CT 和 MR 影像的處理、轉換和 RTSTRUCT 生成
使用方式:
1. 命令列執行:
python oar.py /nn/3894670/20241111/CT/a /nn/3894670/20241111/MR/6
2. Jupyter Notebook 執行:
from oar import inference, SendDCM
rtss_file = inference("/nn/3894670/20241111/CT/a", "/nn/3894670/20241111/MR/6")
SendDCM(rtss_file)
"""
import os
import shutil
import subprocess
import sys
import time
import logging
from pathlib import Path
from nipype.interfaces.dcm2nii import Dcm2niix
from pydicom import dcmread
from pynetdicom import AE, debug_logger
from pynetdicom.sop_class import CTImageStorage, RTStructureSetStorage
from rt_utils import RTStructBuilder
import numpy as np
import SimpleITK as sitk
from registration.best_reg import reg_transform
# 設定日誌
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('medical_imaging.log')
]
)
logger = logging.getLogger(__name__)
def validate_dicom_directory(dicom_path):
"""
加強版 DICOM 目錄驗證
參數:
dicom_path: str - DICOM 目錄路徑
回傳:
tuple: (is_valid: bool, message: str)
"""
try:
if not os.path.exists(dicom_path):
return False, f"目錄不存在:{dicom_path}"
if not os.listdir(dicom_path):
return False, f"目錄是空的:{dicom_path}"
dicom_files = []
non_dicom = []
corrupt_files = []
series_info = {}
for filename in os.listdir(dicom_path):
filepath = os.path.join(dicom_path, filename)
if os.path.isfile(filepath):
try:
ds = dcmread(filepath)
# 檢查必要的 DICOM 標籤
if hasattr(ds, 'SeriesInstanceUID'):
series_uid = ds.SeriesInstanceUID
if series_uid not in series_info:
series_info[series_uid] = []
series_info[series_uid].append(filepath)
dicom_files.append(filepath)
else:
non_dicom.append(filename)
except Exception as e:
logger.warning(f"無法讀取檔案 {filename}: {str(e)}")
if filename.endswith(('.dcm', '.DCM')):
corrupt_files.append(filename)
else:
non_dicom.append(filename)
if not dicom_files:
message = f"在 {dicom_path} 中找不到有效的 DICOM 檔案\n"
if non_dicom:
message += f"發現 {len(non_dicom)} 個非 DICOM 檔案\n"
if corrupt_files:
message += f"發現 {len(corrupt_files)} 個損壞的 DICOM 檔案"
return False, message
# 檢查是否所有檔案屬於同一個系列
if len(series_info) > 1:
logger.warning(f"發現多個系列: {len(series_info)}")
# 檢查檔案數量是否合理
for series_uid, files in series_info.items():
if len(files) < 5: # 假設至少需要5張影像
logger.warning(f"系列 {series_uid} 的影像數量過少: {len(files)}")
return True, f"找到 {len(dicom_files)} 個有效的 DICOM 檔案,分屬 {len(series_info)} 個系列"
except Exception as e:
return False, f"驗證過程發生錯誤:{str(e)}"
def enhanced_dcm2nii(source_dir, output_dir):
"""
加強版的 dcm2nii 轉換函式
參數:
source_dir: str - 來源 DICOM 目錄
output_dir: str - NIfTI 檔案的輸出目錄
回傳:
tuple: (success: bool, message: str, nifti_files: list)
"""
try:
# 驗證輸入目錄
is_valid, message = validate_dicom_directory(source_dir)
if not is_valid:
return False, message, []
# 確保輸出目錄存在
os.makedirs(output_dir, exist_ok=True)
# 記錄轉換前的檔案
initial_files = set(os.listdir(output_dir))
# 設定 dcm2niix 的進階選項
converter = Dcm2niix()
converter.inputs.source_dir = source_dir
converter.inputs.output_dir = output_dir
converter.inputs.compress = 'y'
converter.inputs.merge_imgs = True
converter.inputs.single_file = True
converter.inputs.verbose = True
converter.terminal_output = 'stream'
logger.info(f"執行 DICOM 轉換:{source_dir}")
try:
result = converter.run()
# 輸出 dcm2niix 的詳細日誌
if hasattr(result, 'runtime'):
if hasattr(result.runtime, 'stdout'):
logger.info(f"dcm2niix 輸出:\n{result.runtime.stdout}")
if hasattr(result.runtime, 'stderr'):
logger.warning(f"dcm2niix 錯誤:\n{result.runtime.stderr}")
except Exception as run_error:
logger.error(f"dcm2niix 執行錯誤:{str(run_error)}")
# 檢查新產生的檔案
final_files = set(os.listdir(output_dir))
new_files = final_files - initial_files
nifti_files = [f for f in new_files if f.endswith(('.nii.gz', '.nii'))]
if not nifti_files:
# 嘗試使用備用轉換選項
logger.warning("使用備用轉換選項重試...")
converter.inputs.merge_imgs = False
converter.inputs.single_file = False
try:
result = converter.run()
except Exception as retry_error:
logger.error(f"備用轉換選項執行錯誤:{str(retry_error)}")
final_files = set(os.listdir(output_dir))
new_files = final_files - initial_files
nifti_files = [f for f in new_files if f.endswith(('.nii.gz', '.nii'))]
if not nifti_files:
return False, "轉換完成但未產生 NIfTI 檔案", []
# 驗證產生的 NIfTI 檔案
valid_nifti = []
for nii_file in nifti_files:
try:
img = sitk.ReadImage(os.path.join(output_dir, nii_file))
if img.GetSize()[2] > 1: # 確保是3D影像
valid_nifti.append(nii_file)
else:
logger.warning(f"檔案 {nii_file} 不是有效的3D影像")
except Exception as e:
logger.warning(f"無法讀取 NIfTI 檔案 {nii_file}: {str(e)}")
if not valid_nifti:
return False, "轉換產生的 NIfTI 檔案無效", []
return True, f"成功轉換 {len(valid_nifti)} 個有效的 NIfTI 檔案", valid_nifti
except Exception as e:
logger.error(f"DICOM 轉換錯誤:{str(e)}", exc_info=True)
return False, f"轉換過程發生錯誤:{str(e)}", []
def inference(DCM_CT, DCM_MR):
"""
執行影像推論和後處理
參數:
DCM_CT: str - CT 影像的 DICOM 目錄路徑
DCM_MR: str - MR 影像的 DICOM 目錄路徑
回傳:
str: 生成的 RTSTRUCT 檔案路徑
"""
logger.info(f"處理影像\nCT: {DCM_CT}\nMR: {DCM_MR}")
try:
# 驗證輸入目錄
for path, desc in [(DCM_CT, "CT"), (DCM_MR, "MR")]:
is_valid, message = validate_dicom_directory(path)
if not is_valid:
raise ValueError(f"{desc} 影像驗證失敗:{message}")
# 設定工作目錄
ROOT_DIR = os.path.dirname(os.path.dirname(DCM_CT))
NII_DIR = os.path.join(ROOT_DIR, 'nii')
INPUT_DIR = os.path.join(ROOT_DIR, 'input')
OUTPUT_DIR = os.path.join(ROOT_DIR, 'output')
# 設定 RTSTRUCT 輸出路徑
head, tail = os.path.split(DCM_CT)
rtss_file = os.path.join(head, tail+'-rtss.dcm')
# 清理並建立工作目錄
for dir_path in [NII_DIR, INPUT_DIR, OUTPUT_DIR]:
shutil.rmtree(dir_path, ignore_errors=True)
os.makedirs(dir_path, exist_ok=True)
# 取得基本檔名
nCT = os.path.basename(DCM_CT)
nMR = os.path.basename(DCM_MR)
# DICOM 轉 NIfTI
logger.info("開始轉換 CT DICOM...")
success_ct, message_ct, ct_files = enhanced_dcm2nii(DCM_CT, NII_DIR)
if not success_ct:
raise RuntimeError(f"CT 轉換失敗:{message_ct}")
logger.info("開始轉換 MR DICOM...")
success_mr, message_mr, mr_files = enhanced_dcm2nii(DCM_MR, NII_DIR)
if not success_mr:
raise RuntimeError(f"MR 轉換失敗:{message_mr}")
# 尋找轉換後的檔案
NII_CT = None
NII_MR = None
for f in os.listdir(NII_DIR):
if f.endswith('.nii.gz'):
full_path = os.path.join(NII_DIR, f)
if f.startswith(nCT+'_'):
NII_CT = full_path
logger.info(f"找到 CT NIfTI 檔案:{f}")
elif f.startswith(nMR+'_'):
NII_MR = full_path
logger.info(f"找到 MR NIfTI 檔案:{f}")
if not NII_CT:
raise FileNotFoundError(f"找不到 CT 的 NIfTI 檔案,目錄內容:{os.listdir(NII_DIR)}")
if not NII_MR:
raise FileNotFoundError(f"找不到 MR 的 NIfTI 檔案,目錄內容:{os.listdir(NII_DIR)}")
# 準備輸入檔案
basename = os.path.basename(NII_MR)
old = '_'+basename.split('_')[-1]
input_file = os.path.join(INPUT_DIR, basename.replace(old, '_0000.nii.gz'))
output_file = os.path.join(OUTPUT_DIR, basename.replace(old, '.nii.gz'))
basename_ct = os.path.basename(NII_CT)
old_ct = '_'+basename_ct.split('_')[-1]
label_file = os.path.join(NII_DIR, basename_ct.replace(old_ct, '.label.nii.gz'))
logger.info(f"複製 MR NIfTI 到輸入目錄:{input_file}")
shutil.copy(NII_MR, input_file)
logger.info("準備執行模型推論...")
try:
# 執行模型推論
subprocess.run([
"nnUNetv2_predict",
"-i", INPUT_DIR,
"-o", OUTPUT_DIR,
"-m", "2d",
"-f", "0",
"-tr", "nnUNetTrainer",
"-p", "nnUNetPlans",
"-t", "501",
"--save_npz"
], check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
logger.error(f"模型推論失敗:\n輸出{e.output}\n錯誤{e.stderr}")
raise RuntimeError("模型推論失敗,請檢查日誌獲取詳細資訊")
logger.info(f"模型推論完成:{output_file}")
if not os.path.exists(output_file):
raise FileNotFoundError(f"找不到模型輸出檔案:{output_file}")
logger.info("開始執行影像配準...")
# 影像配準
try:
reg_transform(NII_CT, NII_MR, output_file, label_file)
except Exception as e:
logger.error(f"影像配準失敗:{str(e)}")
raise RuntimeError("影像配準失敗")
if not os.path.exists(label_file):
raise FileNotFoundError(f"找不到配準後的標籤檔案:{label_file}")
logger.info("開始建立 RTSTRUCT...")
# 讀取 CT 影像序列
try:
reader = sitk.ImageSeriesReader()
dicom_names = reader.GetGDCMSeriesFileNames(DCM_CT)
reader.SetFileNames(dicom_names)
reader.MetaDataDictionaryArrayUpdateOn()
reader.LoadPrivateTagsOn()
image = reader.Execute()
except Exception as e:
logger.error(f"讀取 CT 系列失敗:{str(e)}")
raise RuntimeError("無法讀取 CT 影像系列")
# 重新取樣預測結果
try:
nnU = sitk.ReadImage(label_file)
nnU = sitk.Resample(nnU, image, sitk.Transform(), sitk.sitkNearestNeighbor)
except Exception as e:
logger.error(f"重新取樣失敗:{str(e)}")
raise RuntimeError("預測結果重新取樣失敗")
# 連通元件分析
try:
ccfilter = sitk.ConnectedComponentImageFilter()
nnUCC = ccfilter.Execute(nnU)
ObjectCount1 = ccfilter.GetObjectCount()
if ObjectCount1 == 0:
logger.warning("未找到任何連通元件")
except Exception as e:
logger.error(f"連通元件分析失敗:{str(e)}")
raise RuntimeError("連通元件分析失敗")
# 建立 RTSTRUCT
try:
rtstruct = RTStructBuilder.create_new(dicom_series_path=DCM_CT)
# 處理每個連通元件
for j1 in range(ObjectCount1):
label1 = sitk.BinaryThreshold(nnUCC, j1+1, j1+1)
mask = sitk.GetArrayFromImage(label1).astype(bool)
mask = np.transpose(mask, (1, 2, 0))
if mask.any():
logger.info(f"處理 ROI {j1+1}/{ObjectCount1}")
rtstruct.add_roi(
mask=mask,
color=[255, 0, 0],
name=f"Tumor_{j1+1}"
)
logger.info(f"儲存 RTSTRUCT{rtss_file}")
rtstruct.save(rtss_file)
except Exception as e:
logger.error(f"RTSTRUCT 生成失敗:{str(e)}")
raise RuntimeError("無法生成或儲存 RTSTRUCT")
return rtss_file
except Exception as e:
logger.error(f"推論過程發生錯誤:{str(e)}", exc_info=True)
raise
def SendDCM(fp):
"""
傳送 DICOM 檔案到 PACS
參數:
fp: str - DICOM 檔案路徑
"""
logger.info(f"準備傳送 DICOM 檔案:{fp}")
debug_logger()
if not os.path.exists(fp):
raise FileNotFoundError(f"找不到 DICOM 檔案:{fp}")
try:
ae = AE()
ae.ae_title = 'OUR_STORE_SCP'
ae.add_requested_context(RTStructureSetStorage)
try:
ds = dcmread(fp)
except Exception as e:
logger.error(f"讀取 DICOM 檔案失敗:{str(e)}")
raise RuntimeError("無法讀取 DICOM 檔案")
logger.info("正在連接 PACS 伺服器...")
try:
assoc = ae.associate("172.16.40.36", 104, ae_title='N1000_STORAGE')
except Exception as e:
logger.error(f"PACS 連接失敗:{str(e)}")
raise RuntimeError("無法連接 PACS 伺服器")
if assoc.is_established:
try:
status = assoc.send_c_store(ds)
if status:
logger.info(f'DICOM 傳送成功,狀態碼: 0x{status.Status:04x}')
else:
raise RuntimeError('傳送失敗:連接逾時或收到無效回應')
except Exception as e:
logger.error(f"DICOM 傳送失敗:{str(e)}")
raise
finally:
assoc.release()
else:
raise RuntimeError('傳送失敗:無法建立連接')
except Exception as e:
logger.error(f"DICOM 傳送過程發生錯誤:{str(e)}", exc_info=True)
raise
def main():
"""主程式進入點"""
if len(sys.argv) < 3:
print('使用方式:', sys.argv[0], '<CT_DICOM_路徑> <MR_DICOM_路徑>')
print('範例: python oar.py /nn/3894670/20241111/CT/a /nn/3894670/20241111/MR/6')
sys.exit(1)
logger.info('開始執行')
logger.info('程式名稱: %s', sys.argv[0])
logger.info('CT路徑: %s', sys.argv[1])
logger.info('MR路徑: %s', sys.argv[2])
start_time = time.time()
try:
rtss_file = inference(sys.argv[1], sys.argv[2])
SendDCM(rtss_file)
logger.info(f"處理完成,總耗時:{time.time() - start_time:.2f} 秒")
except Exception as e:
logger.error(f"處理過程發生錯誤:{str(e)}", exc_info=True)
sys.exit(1)
if __name__ == '__main__':
main()