123/oar.py.save
2025-02-01 15:57:22 +08:00

451 lines
16 KiB
Text
Executable file
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
醫學影像處理程式
功能:執行 CT 和 MR 影像的處理、轉換和 RTSTRUCT 生成
使用方式:
1. 命令列執行:
python oar.py /nn/3894670/20241111/CT/a /nn/3894670/20241111/MR/6
2. Jupyter Notebook 執行:
from oar import inference, SendDCM
rtss_file = inference("/nn/3894670/20241111/CT/a", "/nn/3894670/20241111/MR/6")
SendDCM(rtss_file)
"""
import os
import shutil
import subprocess
import sys
import time
import logging
from pathlib import Path
from nipype.interfaces.dcm2nii import Dcm2niix
from pydicom import dcmread
from pynetdicom import AE, debug_logger
from pynetdicom.sop_class import CTImageStorage, RTStructureSetStorage
from rt_utils import RTStructBuilder
import numpy as np
import SimpleITK as sitk
from registration.best_reg import reg_transform
# 設定日誌
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('medical_imaging.log')
]
)
logger = logging.getLogger(__name__)
def validate_dicom_directory(dicom_path):
"""
加強版 DICOM 目錄驗證
參數:
dicom_path: str - DICOM 目錄路徑
回傳:
tuple: (is_valid: bool, message: str)
"""
try:
if not os.path.exists(dicom_path):
return False, f"目錄不存在:{dicom_path}"
if not os.listdir(dicom_path):
return False, f"目錄是空的:{dicom_path}"
dicom_files = []
non_dicom = []
corrupt_files = []
series_info = {}
for filename in os.listdir(dicom_path):
filepath = os.path.join(dicom_path, filename)
if os.path.isfile(filepath):
try:
ds = dcmread(filepath)
# 檢查必要的 DICOM 標籤
if hasattr(ds, 'SeriesInstanceUID'):
series_uid = ds.SeriesInstanceUID
if series_uid not in series_info:
series_info[series_uid] = []
series_info[series_uid].append(filepath)
dicom_files.append(filepath)
else:
non_dicom.append(filename)
except Exception as e:
logger.warning(f"無法讀取檔案 {filename}: {str(e)}")
if filename.endswith(('.dcm', '.DCM')):
corrupt_files.append(filename)
else:
non_dicom.append(filename)
if not dicom_files:
message = f"在 {dicom_path} 中找不到有效的 DICOM 檔案\n"
if non_dicom:
message += f"發現 {len(non_dicom)} 個非 DICOM 檔案\n"
if corrupt_files:
message += f"發現 {len(corrupt_files)} 個損壞的 DICOM 檔案"
return False, message
# 檢查是否所有檔案屬於同一個系列
if len(series_info) > 1:
logger.warning(f"發現多個系列: {len(series_info)}")
# 檢查檔案數量是否合理
for series_uid, files in series_info.items():
if len(files) < 5: # 假設至少需要5張影像
logger.warning(f"系列 {series_uid} 的影像數量過少: {len(files)}")
return True, f"找到 {len(dicom_files)} 個有效的 DICOM 檔案,分屬 {len(series_info)} 個系列"
except Exception as e:
return False, f"驗證過程發生錯誤:{str(e)}"
def enhanced_dcm2nii(source_dir, output_dir):
"""
加強版的 dcm2nii 轉換函式
參數:
source_dir: str - 來源 DICOM 目錄
output_dir: str - NIfTI 檔案的輸出目錄
回傳:
tuple: (success: bool, message: str, nifti_files: list)
"""
try:
# 驗證輸入目錄
is_valid, message = validate_dicom_directory(source_dir)
if not is_valid:
return False, message, []
# 確保輸出目錄存在
os.makedirs(output_dir, exist_ok=True)
# 記錄轉換前的檔案
initial_files = set(os.listdir(output_dir))
# 設定 dcm2niix 的進階選項
converter = Dcm2niix()
converter.inputs.source_dir = source_dir
converter.inputs.output_dir = output_dir
converter.inputs.compress = 'y'
converter.inputs.merge_imgs = True
converter.inputs.single_file = True
converter.inputs.verbose = True
converter.terminal_output = 'stream'
logger.info(f"執行 DICOM 轉換:{source_dir}")
try:
result = converter.run()
# 輸出 dcm2niix 的詳細日誌
if hasattr(result, 'runtime'):
if hasattr(result.runtime, 'stdout'):
logger.info(f"dcm2niix 輸出:\n{result.runtime.stdout}")
if hasattr(result.runtime, 'stderr'):
logger.warning(f"dcm2niix 錯誤:\n{result.runtime.stderr}")
except Exception as run_error:
logger.error(f"dcm2niix 執行錯誤:{str(run_error)}")
# 檢查新產生的檔案
final_files = set(os.listdir(output_dir))
new_files = final_files - initial_files
nifti_files = [f for f in new_files if f.endswith(('.nii.gz', '.nii'))]
if not nifti_files:
# 嘗試使用備用轉換選項
logger.warning("使用備用轉換選項重試...")
converter.inputs.merge_imgs = False
converter.inputs.single_file = False
try:
result = converter.run()
except Exception as retry_error:
logger.error(f"備用轉換選項執行錯誤:{str(retry_error)}")
final_files = set(os.listdir(output_dir))
new_files = final_files - initial_files
nifti_files = [f for f in new_files if f.endswith(('.nii.gz', '.nii'))]
if not nifti_files:
return False, "轉換完成但未產生 NIfTI 檔案", []
# 驗證產生的 NIfTI 檔案
valid_nifti = []
for nii_file in nifti_files:
try:
img = sitk.ReadImage(os.path.join(output_dir, nii_file))
if img.GetSize()[2] > 1: # 確保是3D影像
valid_nifti.append(nii_file)
else:
logger.warning(f"檔案 {nii_file} 不是有效的3D影像")
except Exception as e:
logger.warning(f"無法讀取 NIfTI 檔案 {nii_file}: {str(e)}")
if not valid_nifti:
return False, "轉換產生的 NIfTI 檔案無效", []
return True, f"成功轉換 {len(valid_nifti)} 個有效的 NIfTI 檔案", valid_nifti
except Exception as e:
logger.error(f"DICOM 轉換錯誤:{str(e)}", exc_info=True)
return False, f"轉換過程發生錯誤:{str(e)}", []
def inference(DCM_CT, DCM_MR):
"""
執行影像推論和後處理
參數:
DCM_CT: str - CT 影像的 DICOM 目錄路徑
DCM_MR: str - MR 影像的 DICOM 目錄路徑
回傳:
str: 生成的 RTSTRUCT 檔案路徑
"""
logger.info(f"處理影像\nCT: {DCM_CT}\nMR: {DCM_MR}")
try:
# 驗證輸入目錄
for path, desc in [(DCM_CT, "CT"), (DCM_MR, "MR")]:
is_valid, message = validate_dicom_directory(path)
if not is_valid:
raise ValueError(f"{desc} 影像驗證失敗:{message}")
# 設定工作目錄
ROOT_DIR = os.path.dirname(os.path.dirname(DCM_CT))
NII_DIR = os.path.join(ROOT_DIR, 'nii')
INPUT_DIR = os.path.join(ROOT_DIR, 'input')
OUTPUT_DIR = os.path.join(ROOT_DIR, 'output')
# 設定 RTSTRUCT 輸出路徑
head, tail = os.path.split(DCM_CT)
rtss_file = os.path.join(head, tail+'-rtss.dcm')
# 清理並建立工作目錄
for dir_path in [NII_DIR, INPUT_DIR, OUTPUT_DIR]:
shutil.rmtree(dir_path, ignore_errors=True)
os.makedirs(dir_path, exist_ok=True)
# 取得基本檔名
nCT = os.path.basename(DCM_CT)
nMR = os.path.basename(DCM_MR)
# DICOM 轉 NIfTI
logger.info("開始轉換 CT DICOM...")
success_ct, message_ct, ct_files = enhanced_dcm2nii(DCM_CT, NII_DIR)
if not success_ct:
raise RuntimeError(f"CT 轉換失敗:{message_ct}")
logger.info("開始轉換 MR DICOM...")
success_mr, message_mr, mr_files = enhanced_dcm2nii(DCM_MR, NII_DIR)
if not success_mr:
raise RuntimeError(f"MR 轉換失敗:{message_mr}")
# 尋找轉換後的檔案
NII_CT = None
NII_MR = None
for f in os.listdir(NII_DIR):
if f.endswith('.nii.gz'):
full_path = os.path.join(NII_DIR, f)
if f.startswith(nCT+'_'):
NII_CT = full_path
logger.info(f"找到 CT NIfTI 檔案:{f}")
elif f.startswith(nMR+'_'):
NII_MR = full_path
logger.info(f"找到 MR NIfTI 檔案:{f}")
if not NII_CT:
raise FileNotFoundError(f"找不到 CT 的 NIfTI 檔案,目錄內容:{os.listdir(NII_DIR)}")
if not NII_MR:
raise FileNotFoundError(f"找不到 MR 的 NIfTI 檔案,目錄內容:{os.listdir(NII_DIR)}")
# 準備輸入檔案
basename = os.path.basename(NII_MR)
old = '_'+basename.split('_')[-1]
input_file = os.path.join(INPUT_DIR, basename.replace(old, '_0000.nii.gz'))
output_file = os.path.join(OUTPUT_DIR, basename.replace(old, '.nii.gz'))
basename_ct = os.path.basename(NII_CT)
old_ct = '_'+basename_ct.split('_')[-1]
label_file = os.path.join(NII_DIR, basename_ct.replace(old_ct, '.label.nii.gz'))
logger.info(f"複製 MR NIfTI 到輸入目錄:{input_file}")
shutil.copy(NII_MR, input_file)
logger.info("準備執行模型推論...")
try:
# 執行模型推論
subprocess.run([
"nnUNetv2_predict",
"-i", INPUT_DIR,
"-o", OUTPUT_DIR,
"-m", "2d",
"-f", "0",
"-tr", "nnUNetTrainer",
"-p", "nnUNetPlans",
"-t", "501",
"--save_npz"
], check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
logger.error(f"模型推論失敗:\n輸出{e.output}\n錯誤{e.stderr}")
raise RuntimeError("模型推論失敗,請檢查日誌獲取詳細資訊")
logger.info(f"模型推論完成:{output_file}")
if not os.path.exists(output_file):
raise FileNotFoundError(f"找不到模型輸出檔案:{output_file}")
logger.info("開始執行影像配準...")
# 影像配準
try:
reg_transform(NII_CT, NII_MR, output_file, label_file)
except Exception as e:
logger.error(f"影像配準失敗:{str(e)}")
raise RuntimeError("影像配準失敗")
if not os.path.exists(label_file):
raise FileNotFoundError(f"找不到配準後的標籤檔案:{label_file}")
logger.info("開始建立 RTSTRUCT...")
# 讀取 CT 影像序列
try:
reader = sitk.ImageSeriesReader()
dicom_names = reader.GetGDCMSeriesFileNames(DCM_CT)
reader.SetFileNames(dicom_names)
reader.MetaDataDictionaryArrayUpdateOn()
reader.LoadPrivateTagsOn()
image = reader.Execute()
except Exception as e:
logger.error(f"讀取 CT 系列失敗:{str(e)}")
raise RuntimeError("無法讀取 CT 影像系列")
# 重新取樣預測結果
try:
nnU = sitk.ReadImage(label_file)
nnU = sitk.Resample(nnU, image, sitk.Transform(), sitk.sitkNearestNeighbor)
except Exception as e:
logger.error(f"重新取樣失敗:{str(e)}")
raise RuntimeError("預測結果重新取樣失敗")
# 連通元件分析
try:
ccfilter = sitk.ConnectedComponentImageFilter()
nnUCC = ccfilter.Execute(nnU)
ObjectCount1 = ccfilter.GetObjectCount()
if ObjectCount1 == 0:
logger.warning("未找到任何連通元件")
except Exception as e:
logger.error(f"連通元件分析失敗:{str(e)}")
raise RuntimeError("連通元件分析失敗")
# 建立 RTSTRUCT
try:
rtstruct = RTStructBuilder.create_new(dicom_series_path=DCM_CT)
# 處理每個連通元件
for j1 in range(ObjectCount1):
label1 = sitk.BinaryThreshold(nnUCC, j1+1, j1+1)
mask = sitk.GetArrayFromImage(label1).astype(bool)
mask = np.transpose(mask, (1, 2, 0))
if mask.any():
logger.info(f"處理 ROI {j1+1}/{ObjectCount1}")
rtstruct.add_roi(
mask=mask,
color=[255, 0, 0],
name=f"Tumor_{j1+1}"
)
logger.info(f"儲存 RTSTRUCT{rtss_file}")
rtstruct.save(rtss_file)
except Exception as e:
logger.error(f"RTSTRUCT 生成失敗:{str(e)}")
raise RuntimeError("無法生成或儲存 RTSTRUCT")
return rtss_file
except Exception as e:
logger.error(f"推論過程發生錯誤:{str(e)}", exc_info=True)
raise
def SendDCM(fp):
"""
傳送 DICOM 檔案到 PACS
參數:
fp: str - DICOM 檔案路徑
"""
logger.info(f"準備傳送 DICOM 檔案:{fp}")
debug_logger()
if not os.path.exists(fp):
raise FileNotFoundError(f"找不到 DICOM 檔案:{fp}")
try:
ae = AE()
ae.ae_title = 'OUR_STORE_SCP'
ae.add_requested_context(RTStructureSetStorage)
try:
ds = dcmread(fp)
except Exception as e:
logger.error(f"讀取 DICOM 檔案失敗:{str(e)}")
raise RuntimeError("無法讀取 DICOM 檔案")
logger.info("正在連接 PACS 伺服器...")
try:
assoc = ae.associate("172.16.40.36", 104, ae_title='N1000_STORAGE')
except Exception as e:
logger.error(f"PACS 連接失敗:{str(e)}")
raise RuntimeError("無法連接 PACS 伺服器")
if assoc.is_established:
try:
status = assoc.send_c_store(ds)
if status:
logger.info(f'DICOM 傳送成功,狀態碼: 0x{status.Status:04x}')
else:
raise RuntimeError('傳送失敗:連接逾時或收到無效回應')
except Exception as e:
logger.error(f"DICOM 傳送失敗:{str(e)}")
raise
finally:
assoc.release()
else:
raise RuntimeError('傳送失敗:無法建立連接')
except Exception as e:
logger.error(f"DICOM 傳送過程發生錯誤:{str(e)}", exc_info=True)
raise
def main():
"""主程式進入點"""
if len(sys.argv) < 3:
print('使用方式:', sys.argv[0], '<CT_DICOM_路徑> <MR_DICOM_路徑>')
print('範例: python oar.py /nn/3894670/20241111/CT/a /nn/3894670/20241111/MR/6')
sys.exit(1)
logger.info('開始執行')
logger.info('程式名稱: %s', sys.argv[0])
logger.info('CT路徑: %s', sys.argv[1])
logger.info('MR路徑: %s', sys.argv[2])
start_time = time.time()
try:
rtss_file = inference(sys.argv[1], sys.argv[2])
SendDCM(rtss_file)
logger.info(f"處理完成,總耗時:{time.time() - start_time:.2f} 秒")
except Exception as e:
logger.error(f"處理過程發生錯誤:{str(e)}", exc_info=True)
sys.exit(1)
if __name__ == '__main__':
main()