451 lines
16 KiB
Text
Executable file
451 lines
16 KiB
Text
Executable file
"""
|
||
醫學影像處理程式
|
||
功能:執行 CT 和 MR 影像的處理、轉換和 RTSTRUCT 生成
|
||
|
||
使用方式:
|
||
1. 命令列執行:
|
||
python oar.py /nn/3894670/20241111/CT/a /nn/3894670/20241111/MR/6
|
||
|
||
2. Jupyter Notebook 執行:
|
||
from oar import inference, SendDCM
|
||
rtss_file = inference("/nn/3894670/20241111/CT/a", "/nn/3894670/20241111/MR/6")
|
||
SendDCM(rtss_file)
|
||
"""
|
||
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
import logging
|
||
from pathlib import Path
|
||
|
||
from nipype.interfaces.dcm2nii import Dcm2niix
|
||
from pydicom import dcmread
|
||
from pynetdicom import AE, debug_logger
|
||
from pynetdicom.sop_class import CTImageStorage, RTStructureSetStorage
|
||
from rt_utils import RTStructBuilder
|
||
|
||
import numpy as np
|
||
import SimpleITK as sitk
|
||
|
||
from registration.best_reg import reg_transform
|
||
|
||
# 設定日誌
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.StreamHandler(),
|
||
logging.FileHandler('medical_imaging.log')
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def validate_dicom_directory(dicom_path):
|
||
"""
|
||
加強版 DICOM 目錄驗證
|
||
|
||
參數:
|
||
dicom_path: str - DICOM 目錄路徑
|
||
|
||
回傳:
|
||
tuple: (is_valid: bool, message: str)
|
||
"""
|
||
try:
|
||
if not os.path.exists(dicom_path):
|
||
return False, f"目錄不存在:{dicom_path}"
|
||
|
||
if not os.listdir(dicom_path):
|
||
return False, f"目錄是空的:{dicom_path}"
|
||
|
||
dicom_files = []
|
||
non_dicom = []
|
||
corrupt_files = []
|
||
series_info = {}
|
||
|
||
for filename in os.listdir(dicom_path):
|
||
filepath = os.path.join(dicom_path, filename)
|
||
if os.path.isfile(filepath):
|
||
try:
|
||
ds = dcmread(filepath)
|
||
# 檢查必要的 DICOM 標籤
|
||
if hasattr(ds, 'SeriesInstanceUID'):
|
||
series_uid = ds.SeriesInstanceUID
|
||
if series_uid not in series_info:
|
||
series_info[series_uid] = []
|
||
series_info[series_uid].append(filepath)
|
||
dicom_files.append(filepath)
|
||
else:
|
||
non_dicom.append(filename)
|
||
except Exception as e:
|
||
logger.warning(f"無法讀取檔案 {filename}: {str(e)}")
|
||
if filename.endswith(('.dcm', '.DCM')):
|
||
corrupt_files.append(filename)
|
||
else:
|
||
non_dicom.append(filename)
|
||
|
||
if not dicom_files:
|
||
message = f"在 {dicom_path} 中找不到有效的 DICOM 檔案\n"
|
||
if non_dicom:
|
||
message += f"發現 {len(non_dicom)} 個非 DICOM 檔案\n"
|
||
if corrupt_files:
|
||
message += f"發現 {len(corrupt_files)} 個損壞的 DICOM 檔案"
|
||
return False, message
|
||
|
||
# 檢查是否所有檔案屬於同一個系列
|
||
if len(series_info) > 1:
|
||
logger.warning(f"發現多個系列: {len(series_info)}")
|
||
|
||
# 檢查檔案數量是否合理
|
||
for series_uid, files in series_info.items():
|
||
if len(files) < 5: # 假設至少需要5張影像
|
||
logger.warning(f"系列 {series_uid} 的影像數量過少: {len(files)}")
|
||
|
||
return True, f"找到 {len(dicom_files)} 個有效的 DICOM 檔案,分屬 {len(series_info)} 個系列"
|
||
|
||
except Exception as e:
|
||
return False, f"驗證過程發生錯誤:{str(e)}"
|
||
|
||
def enhanced_dcm2nii(source_dir, output_dir):
|
||
"""
|
||
加強版的 dcm2nii 轉換函式
|
||
|
||
參數:
|
||
source_dir: str - 來源 DICOM 目錄
|
||
output_dir: str - NIfTI 檔案的輸出目錄
|
||
|
||
回傳:
|
||
tuple: (success: bool, message: str, nifti_files: list)
|
||
"""
|
||
try:
|
||
# 驗證輸入目錄
|
||
is_valid, message = validate_dicom_directory(source_dir)
|
||
if not is_valid:
|
||
return False, message, []
|
||
|
||
# 確保輸出目錄存在
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
# 記錄轉換前的檔案
|
||
initial_files = set(os.listdir(output_dir))
|
||
|
||
# 設定 dcm2niix 的進階選項
|
||
converter = Dcm2niix()
|
||
converter.inputs.source_dir = source_dir
|
||
converter.inputs.output_dir = output_dir
|
||
converter.inputs.compress = 'y'
|
||
converter.inputs.merge_imgs = True
|
||
converter.inputs.single_file = True
|
||
converter.inputs.verbose = True
|
||
converter.terminal_output = 'stream'
|
||
|
||
logger.info(f"執行 DICOM 轉換:{source_dir}")
|
||
try:
|
||
result = converter.run()
|
||
|
||
# 輸出 dcm2niix 的詳細日誌
|
||
if hasattr(result, 'runtime'):
|
||
if hasattr(result.runtime, 'stdout'):
|
||
logger.info(f"dcm2niix 輸出:\n{result.runtime.stdout}")
|
||
if hasattr(result.runtime, 'stderr'):
|
||
logger.warning(f"dcm2niix 錯誤:\n{result.runtime.stderr}")
|
||
except Exception as run_error:
|
||
logger.error(f"dcm2niix 執行錯誤:{str(run_error)}")
|
||
|
||
# 檢查新產生的檔案
|
||
final_files = set(os.listdir(output_dir))
|
||
new_files = final_files - initial_files
|
||
nifti_files = [f for f in new_files if f.endswith(('.nii.gz', '.nii'))]
|
||
|
||
if not nifti_files:
|
||
# 嘗試使用備用轉換選項
|
||
logger.warning("使用備用轉換選項重試...")
|
||
converter.inputs.merge_imgs = False
|
||
converter.inputs.single_file = False
|
||
try:
|
||
result = converter.run()
|
||
except Exception as retry_error:
|
||
logger.error(f"備用轉換選項執行錯誤:{str(retry_error)}")
|
||
|
||
final_files = set(os.listdir(output_dir))
|
||
new_files = final_files - initial_files
|
||
nifti_files = [f for f in new_files if f.endswith(('.nii.gz', '.nii'))]
|
||
|
||
if not nifti_files:
|
||
return False, "轉換完成但未產生 NIfTI 檔案", []
|
||
|
||
# 驗證產生的 NIfTI 檔案
|
||
valid_nifti = []
|
||
for nii_file in nifti_files:
|
||
try:
|
||
img = sitk.ReadImage(os.path.join(output_dir, nii_file))
|
||
if img.GetSize()[2] > 1: # 確保是3D影像
|
||
valid_nifti.append(nii_file)
|
||
else:
|
||
logger.warning(f"檔案 {nii_file} 不是有效的3D影像")
|
||
except Exception as e:
|
||
logger.warning(f"無法讀取 NIfTI 檔案 {nii_file}: {str(e)}")
|
||
|
||
if not valid_nifti:
|
||
return False, "轉換產生的 NIfTI 檔案無效", []
|
||
|
||
return True, f"成功轉換 {len(valid_nifti)} 個有效的 NIfTI 檔案", valid_nifti
|
||
|
||
except Exception as e:
|
||
logger.error(f"DICOM 轉換錯誤:{str(e)}", exc_info=True)
|
||
return False, f"轉換過程發生錯誤:{str(e)}", []
|
||
|
||
def inference(DCM_CT, DCM_MR):
|
||
"""
|
||
執行影像推論和後處理
|
||
|
||
參數:
|
||
DCM_CT: str - CT 影像的 DICOM 目錄路徑
|
||
DCM_MR: str - MR 影像的 DICOM 目錄路徑
|
||
|
||
回傳:
|
||
str: 生成的 RTSTRUCT 檔案路徑
|
||
"""
|
||
logger.info(f"處理影像\nCT: {DCM_CT}\nMR: {DCM_MR}")
|
||
|
||
try:
|
||
# 驗證輸入目錄
|
||
for path, desc in [(DCM_CT, "CT"), (DCM_MR, "MR")]:
|
||
is_valid, message = validate_dicom_directory(path)
|
||
if not is_valid:
|
||
raise ValueError(f"{desc} 影像驗證失敗:{message}")
|
||
|
||
# 設定工作目錄
|
||
ROOT_DIR = os.path.dirname(os.path.dirname(DCM_CT))
|
||
NII_DIR = os.path.join(ROOT_DIR, 'nii')
|
||
INPUT_DIR = os.path.join(ROOT_DIR, 'input')
|
||
OUTPUT_DIR = os.path.join(ROOT_DIR, 'output')
|
||
|
||
# 設定 RTSTRUCT 輸出路徑
|
||
head, tail = os.path.split(DCM_CT)
|
||
rtss_file = os.path.join(head, tail+'-rtss.dcm')
|
||
|
||
# 清理並建立工作目錄
|
||
for dir_path in [NII_DIR, INPUT_DIR, OUTPUT_DIR]:
|
||
shutil.rmtree(dir_path, ignore_errors=True)
|
||
os.makedirs(dir_path, exist_ok=True)
|
||
|
||
# 取得基本檔名
|
||
nCT = os.path.basename(DCM_CT)
|
||
nMR = os.path.basename(DCM_MR)
|
||
|
||
# DICOM 轉 NIfTI
|
||
logger.info("開始轉換 CT DICOM...")
|
||
success_ct, message_ct, ct_files = enhanced_dcm2nii(DCM_CT, NII_DIR)
|
||
if not success_ct:
|
||
raise RuntimeError(f"CT 轉換失敗:{message_ct}")
|
||
|
||
logger.info("開始轉換 MR DICOM...")
|
||
success_mr, message_mr, mr_files = enhanced_dcm2nii(DCM_MR, NII_DIR)
|
||
if not success_mr:
|
||
raise RuntimeError(f"MR 轉換失敗:{message_mr}")
|
||
|
||
# 尋找轉換後的檔案
|
||
NII_CT = None
|
||
NII_MR = None
|
||
for f in os.listdir(NII_DIR):
|
||
if f.endswith('.nii.gz'):
|
||
full_path = os.path.join(NII_DIR, f)
|
||
if f.startswith(nCT+'_'):
|
||
NII_CT = full_path
|
||
logger.info(f"找到 CT NIfTI 檔案:{f}")
|
||
elif f.startswith(nMR+'_'):
|
||
NII_MR = full_path
|
||
logger.info(f"找到 MR NIfTI 檔案:{f}")
|
||
|
||
if not NII_CT:
|
||
raise FileNotFoundError(f"找不到 CT 的 NIfTI 檔案,目錄內容:{os.listdir(NII_DIR)}")
|
||
if not NII_MR:
|
||
raise FileNotFoundError(f"找不到 MR 的 NIfTI 檔案,目錄內容:{os.listdir(NII_DIR)}")
|
||
|
||
# 準備輸入檔案
|
||
basename = os.path.basename(NII_MR)
|
||
old = '_'+basename.split('_')[-1]
|
||
input_file = os.path.join(INPUT_DIR, basename.replace(old, '_0000.nii.gz'))
|
||
output_file = os.path.join(OUTPUT_DIR, basename.replace(old, '.nii.gz'))
|
||
|
||
basename_ct = os.path.basename(NII_CT)
|
||
old_ct = '_'+basename_ct.split('_')[-1]
|
||
label_file = os.path.join(NII_DIR, basename_ct.replace(old_ct, '.label.nii.gz'))
|
||
|
||
logger.info(f"複製 MR NIfTI 到輸入目錄:{input_file}")
|
||
shutil.copy(NII_MR, input_file)
|
||
|
||
logger.info("準備執行模型推論...")
|
||
try:
|
||
# 執行模型推論
|
||
subprocess.run([
|
||
"nnUNetv2_predict",
|
||
"-i", INPUT_DIR,
|
||
"-o", OUTPUT_DIR,
|
||
"-m", "2d",
|
||
"-f", "0",
|
||
"-tr", "nnUNetTrainer",
|
||
"-p", "nnUNetPlans",
|
||
"-t", "501",
|
||
"--save_npz"
|
||
], check=True, capture_output=True, text=True)
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
logger.error(f"模型推論失敗:\n輸出:{e.output}\n錯誤:{e.stderr}")
|
||
raise RuntimeError("模型推論失敗,請檢查日誌獲取詳細資訊")
|
||
|
||
logger.info(f"模型推論完成:{output_file}")
|
||
|
||
if not os.path.exists(output_file):
|
||
raise FileNotFoundError(f"找不到模型輸出檔案:{output_file}")
|
||
|
||
logger.info("開始執行影像配準...")
|
||
|
||
# 影像配準
|
||
try:
|
||
reg_transform(NII_CT, NII_MR, output_file, label_file)
|
||
except Exception as e:
|
||
logger.error(f"影像配準失敗:{str(e)}")
|
||
raise RuntimeError("影像配準失敗")
|
||
|
||
if not os.path.exists(label_file):
|
||
raise FileNotFoundError(f"找不到配準後的標籤檔案:{label_file}")
|
||
|
||
logger.info("開始建立 RTSTRUCT...")
|
||
|
||
# 讀取 CT 影像序列
|
||
try:
|
||
reader = sitk.ImageSeriesReader()
|
||
dicom_names = reader.GetGDCMSeriesFileNames(DCM_CT)
|
||
reader.SetFileNames(dicom_names)
|
||
reader.MetaDataDictionaryArrayUpdateOn()
|
||
reader.LoadPrivateTagsOn()
|
||
image = reader.Execute()
|
||
except Exception as e:
|
||
logger.error(f"讀取 CT 系列失敗:{str(e)}")
|
||
raise RuntimeError("無法讀取 CT 影像系列")
|
||
|
||
# 重新取樣預測結果
|
||
try:
|
||
nnU = sitk.ReadImage(label_file)
|
||
nnU = sitk.Resample(nnU, image, sitk.Transform(), sitk.sitkNearestNeighbor)
|
||
except Exception as e:
|
||
logger.error(f"重新取樣失敗:{str(e)}")
|
||
raise RuntimeError("預測結果重新取樣失敗")
|
||
|
||
# 連通元件分析
|
||
try:
|
||
ccfilter = sitk.ConnectedComponentImageFilter()
|
||
nnUCC = ccfilter.Execute(nnU)
|
||
ObjectCount1 = ccfilter.GetObjectCount()
|
||
|
||
if ObjectCount1 == 0:
|
||
logger.warning("未找到任何連通元件")
|
||
except Exception as e:
|
||
logger.error(f"連通元件分析失敗:{str(e)}")
|
||
raise RuntimeError("連通元件分析失敗")
|
||
|
||
# 建立 RTSTRUCT
|
||
try:
|
||
rtstruct = RTStructBuilder.create_new(dicom_series_path=DCM_CT)
|
||
|
||
# 處理每個連通元件
|
||
for j1 in range(ObjectCount1):
|
||
label1 = sitk.BinaryThreshold(nnUCC, j1+1, j1+1)
|
||
mask = sitk.GetArrayFromImage(label1).astype(bool)
|
||
mask = np.transpose(mask, (1, 2, 0))
|
||
if mask.any():
|
||
logger.info(f"處理 ROI {j1+1}/{ObjectCount1}")
|
||
rtstruct.add_roi(
|
||
mask=mask,
|
||
color=[255, 0, 0],
|
||
name=f"Tumor_{j1+1}"
|
||
)
|
||
|
||
logger.info(f"儲存 RTSTRUCT:{rtss_file}")
|
||
rtstruct.save(rtss_file)
|
||
except Exception as e:
|
||
logger.error(f"RTSTRUCT 生成失敗:{str(e)}")
|
||
raise RuntimeError("無法生成或儲存 RTSTRUCT")
|
||
|
||
return rtss_file
|
||
|
||
except Exception as e:
|
||
logger.error(f"推論過程發生錯誤:{str(e)}", exc_info=True)
|
||
raise
|
||
|
||
def SendDCM(fp):
|
||
"""
|
||
傳送 DICOM 檔案到 PACS
|
||
|
||
參數:
|
||
fp: str - DICOM 檔案路徑
|
||
"""
|
||
logger.info(f"準備傳送 DICOM 檔案:{fp}")
|
||
debug_logger()
|
||
|
||
if not os.path.exists(fp):
|
||
raise FileNotFoundError(f"找不到 DICOM 檔案:{fp}")
|
||
|
||
try:
|
||
ae = AE()
|
||
ae.ae_title = 'OUR_STORE_SCP'
|
||
ae.add_requested_context(RTStructureSetStorage)
|
||
|
||
try:
|
||
ds = dcmread(fp)
|
||
except Exception as e:
|
||
logger.error(f"讀取 DICOM 檔案失敗:{str(e)}")
|
||
raise RuntimeError("無法讀取 DICOM 檔案")
|
||
|
||
logger.info("正在連接 PACS 伺服器...")
|
||
try:
|
||
assoc = ae.associate("172.16.40.36", 104, ae_title='N1000_STORAGE')
|
||
except Exception as e:
|
||
logger.error(f"PACS 連接失敗:{str(e)}")
|
||
raise RuntimeError("無法連接 PACS 伺服器")
|
||
|
||
if assoc.is_established:
|
||
try:
|
||
status = assoc.send_c_store(ds)
|
||
if status:
|
||
logger.info(f'DICOM 傳送成功,狀態碼: 0x{status.Status:04x}')
|
||
else:
|
||
raise RuntimeError('傳送失敗:連接逾時或收到無效回應')
|
||
except Exception as e:
|
||
logger.error(f"DICOM 傳送失敗:{str(e)}")
|
||
raise
|
||
finally:
|
||
assoc.release()
|
||
else:
|
||
raise RuntimeError('傳送失敗:無法建立連接')
|
||
|
||
except Exception as e:
|
||
logger.error(f"DICOM 傳送過程發生錯誤:{str(e)}", exc_info=True)
|
||
raise
|
||
|
||
def main():
|
||
"""主程式進入點"""
|
||
if len(sys.argv) < 3:
|
||
print('使用方式:', sys.argv[0], '<CT_DICOM_路徑> <MR_DICOM_路徑>')
|
||
print('範例: python oar.py /nn/3894670/20241111/CT/a /nn/3894670/20241111/MR/6')
|
||
sys.exit(1)
|
||
|
||
logger.info('開始執行')
|
||
logger.info('程式名稱: %s', sys.argv[0])
|
||
logger.info('CT路徑: %s', sys.argv[1])
|
||
logger.info('MR路徑: %s', sys.argv[2])
|
||
|
||
start_time = time.time()
|
||
try:
|
||
rtss_file = inference(sys.argv[1], sys.argv[2])
|
||
SendDCM(rtss_file)
|
||
logger.info(f"處理完成,總耗時:{time.time() - start_time:.2f} 秒")
|
||
except Exception as e:
|
||
logger.error(f"處理過程發生錯誤:{str(e)}", exc_info=True)
|
||
sys.exit(1)
|
||
|
||
if __name__ == '__main__':
|
||
main()
|