729 lines
28 KiB
Python
729 lines
28 KiB
Python
![]() |
|
|||
|
from pathlib import Path
|
|||
|
from typing import Dict, List, Optional, Tuple
|
|||
|
|
|||
|
import os
|
|||
|
import shutil
|
|||
|
import subprocess
|
|||
|
import sys
|
|||
|
import time
|
|||
|
import logging
|
|||
|
import colorsys
|
|||
|
|
|||
|
from nipype.interfaces.dcm2nii import Dcm2niix
|
|||
|
from pydicom import dcmread
|
|||
|
from pynetdicom import AE, debug_logger
|
|||
|
from pynetdicom.sop_class import CTImageStorage, RTStructureSetStorage
|
|||
|
from rt_utils import RTStructBuilder
|
|||
|
|
|||
|
import numpy as np
|
|||
|
import SimpleITK as sitk
|
|||
|
|
|||
|
from pyradise.data import (Subject, IntensityImage, SegmentationImage,
|
|||
|
Modality, Organ, Annotator)
|
|||
|
from pyradise.fileio.extraction import SimpleModalityExtractor
|
|||
|
import pyradise.data as ps_data
|
|||
|
import pyradise.fileio as ps_io
|
|||
|
|
|||
|
from registration.best_reg import reg_only, reg_transform
|
|||
|
|
|||
|
class ExampleModalityExtractor(ps_io.ModalityExtractor):
|
|||
|
|
|||
|
def extract_from_dicom(self,
|
|||
|
path: str
|
|||
|
) -> Optional[ps_data.Modality]:
|
|||
|
# Extract the necessary attributes from the DICOM file
|
|||
|
tags = (ps_io.Tag((0x0008, 0x0060)), # Modality
|
|||
|
ps_io.Tag((0x0008, 0x103e))) # Series Description
|
|||
|
dataset_dict = self._load_dicom_attributes(tags, path)
|
|||
|
|
|||
|
# Identify the modality rule-based
|
|||
|
modality = dataset_dict.get('Modality', {}).get('value', None)
|
|||
|
series_desc = dataset_dict.get('Series Description', {}).get('value', '')
|
|||
|
if modality == 'MR':
|
|||
|
if 't1' in series_desc.lower():
|
|||
|
return ps_data.Modality('T1')
|
|||
|
elif 't2' in series_desc.lower():
|
|||
|
return ps_data.Modality('T2')
|
|||
|
else:
|
|||
|
return None
|
|||
|
elif modality == 'CT':
|
|||
|
return ps_data.Modality('CT')
|
|||
|
else:
|
|||
|
return None
|
|||
|
|
|||
|
def extract_from_path(self,
|
|||
|
path: str
|
|||
|
) -> Optional[ps_data.Modality]:
|
|||
|
|
|||
|
if 'CT' in path:
|
|||
|
return ps_data.Modality('CT')
|
|||
|
|
|||
|
# Identify the discrete image file's modality rule-based
|
|||
|
filename = os.path.basename(path)
|
|||
|
|
|||
|
# Check if the image contains an img prefix
|
|||
|
# (i.e., it is a intensity image)
|
|||
|
if not filename.startswith('img'):
|
|||
|
return None
|
|||
|
|
|||
|
# Check if the image contains a modality search string
|
|||
|
if 'T1' in filename:
|
|||
|
return ps_data.Modality('T1')
|
|||
|
elif 'T2' in filename:
|
|||
|
return ps_data.Modality('T2')
|
|||
|
else:
|
|||
|
return None
|
|||
|
|
|||
|
|
|||
|
def generate_distinct_colors(n):
|
|||
|
"""生成 n 個視覺上區分度高的顏色"""
|
|||
|
colors = []
|
|||
|
for i in range(n):
|
|||
|
# 使用黃金比例來產生分散的色相值
|
|||
|
hue = (i * 0.618033988749895) % 1
|
|||
|
# 使用較高的飽和度和亮度以確保顏色明顯
|
|||
|
saturation = 0.7 + (i % 3) * 0.1 # 0.7-0.9
|
|||
|
value = 0.8 + (i % 2) * 0.1 # 0.8-0.9
|
|||
|
|
|||
|
# 轉換 HSV 到 RGB
|
|||
|
rgb = colorsys.hsv_to_rgb(hue, saturation, value)
|
|||
|
# 轉換到 0-255 範圍
|
|||
|
color = [int(x * 255) for x in rgb]
|
|||
|
colors.append(color)
|
|||
|
|
|||
|
return colors
|
|||
|
|
|||
|
# 設定顏色映射
|
|||
|
LABEL_COLORS: Dict[str, List[int]] = {
|
|||
|
"Brainstem": [0, 255, 0], # 綠色
|
|||
|
"Right Eye": [255, 255, 224], # 淺黃色
|
|||
|
"Left Eye": [255, 255, 224], # 淺黃色
|
|||
|
"Optic Chiasm": [0, 0, 255], # 藍色
|
|||
|
"Right_Optic_Nerve": [255, 127, 80], # 珊瑚紅
|
|||
|
"Left_Optic_Nerve": [255, 127, 80] # 珊瑚紅
|
|||
|
}
|
|||
|
|
|||
|
# 設定標籤映射
|
|||
|
LABEL_MAPPING: Dict[int, str] = {
|
|||
|
1: "Brainstem",
|
|||
|
2: "Right Eye",
|
|||
|
3: "Left Eye",
|
|||
|
4: "Optic Chiasm",
|
|||
|
5: "Right_Optic_Nerve",
|
|||
|
6: "Left_Optic_Nerve"
|
|||
|
}
|
|||
|
|
|||
|
# 設定日誌
|
|||
|
logging.basicConfig(
|
|||
|
level=logging.INFO,
|
|||
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|||
|
handlers=[
|
|||
|
logging.StreamHandler(),
|
|||
|
logging.FileHandler('medical_imaging.log')
|
|||
|
]
|
|||
|
)
|
|||
|
logger = logging.getLogger(__name__)
|
|||
|
|
|||
|
def validate_dicom_directory(dicom_path):
|
|||
|
"""加強版 DICOM 目錄驗證"""
|
|||
|
try:
|
|||
|
if not os.path.exists(dicom_path):
|
|||
|
return False, f"目錄不存在:{dicom_path}"
|
|||
|
|
|||
|
if not os.listdir(dicom_path):
|
|||
|
return False, f"目錄是空的:{dicom_path}"
|
|||
|
|
|||
|
dicom_files = []
|
|||
|
non_dicom = []
|
|||
|
corrupt_files = []
|
|||
|
series_info = {}
|
|||
|
|
|||
|
for filename in os.listdir(dicom_path):
|
|||
|
filepath = os.path.join(dicom_path, filename)
|
|||
|
if os.path.isfile(filepath):
|
|||
|
try:
|
|||
|
ds = dcmread(filepath)
|
|||
|
if hasattr(ds, 'SeriesInstanceUID'):
|
|||
|
series_uid = ds.SeriesInstanceUID
|
|||
|
if series_uid not in series_info:
|
|||
|
series_info[series_uid] = []
|
|||
|
series_info[series_uid].append(filepath)
|
|||
|
dicom_files.append(filepath)
|
|||
|
else:
|
|||
|
non_dicom.append(filename)
|
|||
|
except Exception as e:
|
|||
|
logger.warning(f"無法讀取檔案 {filename}: {str(e)}")
|
|||
|
if filename.endswith(('.dcm', '.DCM')):
|
|||
|
corrupt_files.append(filename)
|
|||
|
else:
|
|||
|
non_dicom.append(filename)
|
|||
|
|
|||
|
if not dicom_files:
|
|||
|
message = f"在 {dicom_path} 中找不到有效的 DICOM 檔案\n"
|
|||
|
if non_dicom:
|
|||
|
message += f"發現 {len(non_dicom)} 個非 DICOM 檔案\n"
|
|||
|
if corrupt_files:
|
|||
|
message += f"發現 {len(corrupt_files)} 個損壞的 DICOM 檔案"
|
|||
|
return False, message
|
|||
|
|
|||
|
if len(series_info) > 1:
|
|||
|
logger.warning(f"發現多個系列: {len(series_info)}")
|
|||
|
|
|||
|
for series_uid, files in series_info.items():
|
|||
|
if len(files) < 5:
|
|||
|
logger.warning(f"系列 {series_uid} 的影像數量過少: {len(files)}")
|
|||
|
|
|||
|
return True, f"找到 {len(dicom_files)} 個有效的 DICOM 檔案,分屬 {len(series_info)} 個系列"
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
return False, f"驗證過程發生錯誤:{str(e)}"
|
|||
|
|
|||
|
def enhanced_dcm2nii(source_dir, output_dir):
|
|||
|
"""加強版的 dcm2nii 轉換函式"""
|
|||
|
try:
|
|||
|
is_valid, message = validate_dicom_directory(source_dir)
|
|||
|
if not is_valid:
|
|||
|
return False, message, []
|
|||
|
|
|||
|
os.makedirs(output_dir, exist_ok=True)
|
|||
|
initial_files = set(os.listdir(output_dir))
|
|||
|
|
|||
|
converter = Dcm2niix()
|
|||
|
converter.inputs.source_dir = source_dir
|
|||
|
converter.inputs.output_dir = output_dir
|
|||
|
converter.inputs.compress = 'y'
|
|||
|
converter.inputs.merge_imgs = True
|
|||
|
converter.inputs.single_file = True
|
|||
|
converter.inputs.verbose = True
|
|||
|
converter.terminal_output = 'stream'
|
|||
|
|
|||
|
logger.info(f"執行 DICOM 轉換:{source_dir}")
|
|||
|
try:
|
|||
|
result = converter.run()
|
|||
|
|
|||
|
if hasattr(result, 'runtime'):
|
|||
|
if hasattr(result.runtime, 'stdout'):
|
|||
|
logger.info(f"dcm2niix 輸出:\n{result.runtime.stdout}")
|
|||
|
if hasattr(result.runtime, 'stderr'):
|
|||
|
logger.warning(f"dcm2niix 錯誤:\n{result.runtime.stderr}")
|
|||
|
except Exception as run_error:
|
|||
|
logger.error(f"dcm2niix 執行錯誤:{str(run_error)}")
|
|||
|
|
|||
|
final_files = set(os.listdir(output_dir))
|
|||
|
new_files = final_files - initial_files
|
|||
|
nifti_files = [f for f in new_files if f.endswith(('.nii.gz', '.nii'))]
|
|||
|
|
|||
|
if not nifti_files:
|
|||
|
logger.warning("使用備用轉換選項重試...")
|
|||
|
converter.inputs.merge_imgs = False
|
|||
|
converter.inputs.single_file = False
|
|||
|
try:
|
|||
|
result = converter.run()
|
|||
|
except Exception as retry_error:
|
|||
|
logger.error(f"備用轉換選項執行錯誤:{str(retry_error)}")
|
|||
|
|
|||
|
final_files = set(os.listdir(output_dir))
|
|||
|
new_files = final_files - initial_files
|
|||
|
nifti_files = [f for f in new_files if f.endswith(('.nii.gz', '.nii'))]
|
|||
|
|
|||
|
if not nifti_files:
|
|||
|
return False, "轉換完成但未產生 NIfTI 檔案", []
|
|||
|
|
|||
|
valid_nifti = []
|
|||
|
for nii_file in nifti_files:
|
|||
|
try:
|
|||
|
img = sitk.ReadImage(os.path.join(output_dir, nii_file))
|
|||
|
if img.GetSize()[2] > 1:
|
|||
|
valid_nifti.append(nii_file)
|
|||
|
else:
|
|||
|
logger.warning(f"檔案 {nii_file} 不是有效的3D影像")
|
|||
|
except Exception as e:
|
|||
|
logger.warning(f"無法讀取 NIfTI 檔案 {nii_file}: {str(e)}")
|
|||
|
|
|||
|
if not valid_nifti:
|
|||
|
return False, "轉換產生的 NIfTI 檔案無效", []
|
|||
|
|
|||
|
return True, f"成功轉換 {len(valid_nifti)} 個有效的 NIfTI 檔案", valid_nifti
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"DICOM 轉換錯誤:{str(e)}", exc_info=True)
|
|||
|
return False, f"轉換過程發生錯誤:{str(e)}", []
|
|||
|
|
|||
|
def process_symmetric_structures(image_array):
|
|||
|
"""處理對稱結構的後處理函數"""
|
|||
|
logger.info("開始處理對稱結構")
|
|||
|
result_img = image_array.copy()
|
|||
|
|
|||
|
# 定義需要處理的結構及其標籤
|
|||
|
structures = [
|
|||
|
{'name': '眼睛', 'labels': (2, 3), 'kernel_radius': 2},
|
|||
|
{'name': '視神經', 'labels': (5, 6), 'kernel_radius': 2}
|
|||
|
]
|
|||
|
|
|||
|
# 處理每個對稱結構
|
|||
|
for structure in structures:
|
|||
|
logger.info(f"處理{structure['name']}結構")
|
|||
|
# 合併左右標籤
|
|||
|
combined_mask = np.zeros_like(result_img, dtype=np.uint8)
|
|||
|
for label in structure['labels']:
|
|||
|
combined_mask |= (result_img == label)
|
|||
|
|
|||
|
# 形態學閉運算
|
|||
|
combined_mask_sitk = sitk.GetImageFromArray(combined_mask)
|
|||
|
closing_filter = sitk.BinaryMorphologicalClosingImageFilter()
|
|||
|
closing_filter.SetKernelRadius(structure['kernel_radius'])
|
|||
|
processed_mask = sitk.GetArrayFromImage(closing_filter.Execute(combined_mask_sitk))
|
|||
|
|
|||
|
# 連通區域標記
|
|||
|
labeled_components = sitk.GetArrayFromImage(
|
|||
|
sitk.ConnectedComponent(sitk.GetImageFromArray(processed_mask))
|
|||
|
)
|
|||
|
|
|||
|
# 找出最大的兩個區域
|
|||
|
unique, counts = np.unique(labeled_components, return_counts=True)
|
|||
|
if len(unique) >= 3: # 確保至少有兩個區域(除了背景)
|
|||
|
region_sizes = [(i, count) for i, count in zip(unique[1:], counts[1:])]
|
|||
|
sorted_regions = sorted(region_sizes, key=lambda x: x[1], reverse=True)
|
|||
|
|
|||
|
# 獲取最大的兩個區域
|
|||
|
region1 = (labeled_components == sorted_regions[0][0])
|
|||
|
region2 = (labeled_components == sorted_regions[1][0])
|
|||
|
|
|||
|
# 根據質心的x座標決定左右
|
|||
|
center1 = np.mean(np.where(region1)[2])
|
|||
|
center2 = np.mean(np.where(region2)[2])
|
|||
|
|
|||
|
# 分配左右標籤
|
|||
|
if center1 < center2:
|
|||
|
right, left = region1, region2
|
|||
|
else:
|
|||
|
right, left = region2, region1
|
|||
|
|
|||
|
# 更新結果
|
|||
|
result_img[right] = structure['labels'][0] # 右側標籤
|
|||
|
result_img[left] = structure['labels'][1] # 左側標籤
|
|||
|
|
|||
|
logger.info(f"完成{structure['name']}的左右結構處理")
|
|||
|
else:
|
|||
|
logger.warning(f"無法找到足夠的{structure['name']}區域進行處理")
|
|||
|
|
|||
|
return result_img
|
|||
|
|
|||
|
def process_multiple_tumors(image_array):
|
|||
|
"""處理多個腫瘤的函數"""
|
|||
|
logger.info("開始處理多個腫瘤")
|
|||
|
result_img = image_array.copy()
|
|||
|
|
|||
|
# 假設原始標記中 7 代表腫瘤
|
|||
|
tumor_mask = (result_img == 7)
|
|||
|
if not tumor_mask.any():
|
|||
|
logger.warning("未找到腫瘤區域")
|
|||
|
return result_img
|
|||
|
|
|||
|
# 使用連通區域分析找出不同的腫瘤
|
|||
|
tumor_mask_sitk = sitk.GetImageFromArray(tumor_mask.astype(np.uint8))
|
|||
|
connected_filter = sitk.ConnectedComponentImageFilter()
|
|||
|
connected_filter.FullyConnectedOn()
|
|||
|
labeled_tumors = sitk.GetArrayFromImage(connected_filter.Execute(tumor_mask_sitk))
|
|||
|
|
|||
|
num_tumors = connected_filter.GetObjectCount()
|
|||
|
logger.info(f"找到 {num_tumors} 個腫瘤")
|
|||
|
|
|||
|
if num_tumors == 0:
|
|||
|
return result_img
|
|||
|
|
|||
|
# 為每個腫瘤生成唯一的標籤值,從最大現有標籤值開始
|
|||
|
max_existing_label = max(LABEL_MAPPING.keys())
|
|||
|
tumor_colors = generate_distinct_colors(num_tumors)
|
|||
|
|
|||
|
# 更新標籤映射和顏色映射
|
|||
|
for i in range(num_tumors):
|
|||
|
tumor_label = max_existing_label + i + 1
|
|||
|
tumor_name = f"TV_{i+1}"
|
|||
|
LABEL_MAPPING[tumor_label] = tumor_name
|
|||
|
LABEL_COLORS[tumor_name] = tumor_colors[i]
|
|||
|
|
|||
|
# 更新結果圖像中的腫瘤標籤
|
|||
|
result_img[labeled_tumors == (i + 1)] = tumor_label
|
|||
|
logger.info(f"標記腫瘤 {tumor_name} 使用標籤 {tumor_label} 和顏色 {tumor_colors[i]}")
|
|||
|
|
|||
|
return result_img
|
|||
|
|
|||
|
def create_structure_masks(image: sitk.Image) -> Dict[str, np.ndarray]:
|
|||
|
"""從分割結果創建每個結構的遮罩"""
|
|||
|
array = sitk.GetArrayFromImage(image)
|
|||
|
masks = {}
|
|||
|
|
|||
|
for label_id, structure_name in LABEL_MAPPING.items():
|
|||
|
mask = (array == label_id)
|
|||
|
if mask.any():
|
|||
|
masks[structure_name] = np.transpose(mask, (1, 2, 0))
|
|||
|
|
|||
|
return masks
|
|||
|
|
|||
|
def inference(DCM_CT, DCM_MR):
|
|||
|
"""執行影像推論和後處理"""
|
|||
|
logger.info(f"處理影像\nCT: {DCM_CT}\nMR: {DCM_MR}")
|
|||
|
|
|||
|
try:
|
|||
|
# 驗證# 驗證輸入目錄
|
|||
|
for path, desc in [(DCM_CT, "CT"), (DCM_MR, "MR")]:
|
|||
|
is_valid, message = validate_dicom_directory(path)
|
|||
|
if not is_valid:
|
|||
|
raise ValueError(f"{desc} 影像驗證失敗:{message}")
|
|||
|
|
|||
|
# 設定工作目錄
|
|||
|
ROOT_DIR = os.path.dirname(os.path.dirname(DCM_CT))
|
|||
|
NII_DIR = os.path.join(ROOT_DIR, 'nii')
|
|||
|
INPUT_DIR = os.path.join(ROOT_DIR, 'input')
|
|||
|
OUTPUT_DIR = os.path.join(ROOT_DIR, 'output')
|
|||
|
|
|||
|
# 設定 RTSTRUCT 輸出路徑
|
|||
|
head, tail = os.path.split(DCM_CT)
|
|||
|
rtss_file = os.path.join(head, tail+'-rtss.dcm')
|
|||
|
|
|||
|
# 清理並建立工作目錄
|
|||
|
for dir_path in [NII_DIR, INPUT_DIR, OUTPUT_DIR]:
|
|||
|
shutil.rmtree(dir_path, ignore_errors=True)
|
|||
|
os.makedirs(dir_path, exist_ok=True)
|
|||
|
|
|||
|
# 取得基本檔名
|
|||
|
nCT = os.path.basename(DCM_CT)
|
|||
|
nMR = os.path.basename(DCM_MR)
|
|||
|
|
|||
|
# DICOM 轉 NIfTI
|
|||
|
logger.info("開始轉換 CT DICOM...")
|
|||
|
success_ct, message_ct, ct_files = enhanced_dcm2nii(DCM_CT, NII_DIR)
|
|||
|
if not success_ct:
|
|||
|
raise RuntimeError(f"CT 轉換失敗:{message_ct}")
|
|||
|
|
|||
|
logger.info("開始轉換 MR DICOM...")
|
|||
|
success_mr, message_mr, mr_files = enhanced_dcm2nii(DCM_MR, NII_DIR)
|
|||
|
if not success_mr:
|
|||
|
raise RuntimeError(f"MR 轉換失敗:{message_mr}")
|
|||
|
|
|||
|
# 尋找轉換後的檔案
|
|||
|
NII_CT = None
|
|||
|
NII_MR = None
|
|||
|
for f in os.listdir(NII_DIR):
|
|||
|
if f.endswith('.nii.gz'):
|
|||
|
full_path = os.path.join(NII_DIR, f)
|
|||
|
if f.startswith(nCT+'_'):
|
|||
|
NII_CT = full_path
|
|||
|
logger.info(f"找到 CT NIfTI 檔案:{f}")
|
|||
|
elif f.startswith(nMR+'_'):
|
|||
|
NII_MR = full_path
|
|||
|
logger.info(f"找到 MR NIfTI 檔案:{f}")
|
|||
|
|
|||
|
if not NII_CT:
|
|||
|
raise FileNotFoundError(f"找不到 CT 的 NIfTI 檔案,目錄內容:{os.listdir(NII_DIR)}")
|
|||
|
if not NII_MR:
|
|||
|
raise FileNotFoundError(f"找不到 MR 的 NIfTI 檔案,目錄內容:{os.listdir(NII_DIR)}")
|
|||
|
|
|||
|
# 準備輸入檔案
|
|||
|
|
|||
|
basename = os.path.basename(NII_MR)
|
|||
|
|
|||
|
old = '_'+basename.split('_')[-1]
|
|||
|
|
|||
|
input_ct = os.path.join(INPUT_DIR, basename.replace(old, '_0000.nii.gz'))
|
|||
|
shutil.copy(NII_CT, input_ct)
|
|||
|
|
|||
|
input_mr = os.path.join(INPUT_DIR, basename.replace(old, '_0001.nii.gz'))
|
|||
|
logger.info("Registration of %s and %s..."%(NII_CT, NII_MR))
|
|||
|
reg_only(NII_CT, NII_MR, input_mr)
|
|||
|
|
|||
|
output_file = os.path.join(OUTPUT_DIR, basename.replace(old, '.nii.gz'))
|
|||
|
|
|||
|
basename_ct = os.path.basename(NII_CT)
|
|||
|
old_ct = '_'+basename_ct.split('_')[-1]
|
|||
|
label_file = os.path.join(NII_DIR, basename_ct.replace(old_ct, '.label.nii.gz'))
|
|||
|
|
|||
|
# basename = os.path.basename(NII_MR)
|
|||
|
# old = '_'+basename.split('_')[-1]
|
|||
|
# input_file = os.path.join(INPUT_DIR, basename.replace(old, '_0000.nii.gz'))
|
|||
|
# output_file = os.path.join(OUTPUT_DIR, basename.replace(old, '.nii.gz'))
|
|||
|
|
|||
|
# basename_ct = os.path.basename(NII_CT)
|
|||
|
# old_ct = '_'+basename_ct.split('_')[-1]
|
|||
|
# label_file = os.path.join(NII_DIR, basename_ct.replace(old_ct, '.label.nii.gz'))
|
|||
|
|
|||
|
# logger.info(f"複製 MR NIfTI 到輸入目錄:{input_file}")
|
|||
|
# shutil.copy(NII_MR, input_file)
|
|||
|
|
|||
|
logger.info("準備執行模型推論...")
|
|||
|
|
|||
|
# 設定 nnUNet 環境變數
|
|||
|
model_dir = "/123/onlylian"
|
|||
|
os.environ['nnUNet_raw'] = "/123/onlylian/nnUNet_raw"
|
|||
|
os.environ['nnUNet_preprocessed'] = "/123/onlylian/nnUNet_preprocessed"
|
|||
|
os.environ['nnUNet_results'] = "/123/onlylian/nnUNet_results"
|
|||
|
|
|||
|
# 明確設定可用的資料集路徑
|
|||
|
dataset_dir = os.path.join(model_dir, "nnUNet_results", "Dataset505")
|
|||
|
logger.info(f"使用資料集目錄:{dataset_dir}")
|
|||
|
|
|||
|
if not os.path.exists(dataset_dir):
|
|||
|
logger.error(f"資料集目錄不存在:{dataset_dir}")
|
|||
|
raise FileNotFoundError(f"找不到資料集目錄:{dataset_dir}")
|
|||
|
|
|||
|
try:
|
|||
|
# 執行模型推論
|
|||
|
predict_cmd = [
|
|||
|
"nnUNetv2_predict",
|
|||
|
"-i", INPUT_DIR,
|
|||
|
"-o", OUTPUT_DIR,
|
|||
|
# "-d", "505",
|
|||
|
"-d", "222",
|
|||
|
# "-c", "3d_fullres",
|
|||
|
"-c", "3d_lowres",
|
|||
|
# "-f", "0",
|
|||
|
"-tr", "nnUNetTrainer",
|
|||
|
"-p", "nnUNetPlans"
|
|||
|
]
|
|||
|
|
|||
|
logger.info(f"執行推論指令:{' '.join(predict_cmd)}")
|
|||
|
result = subprocess.run(
|
|||
|
predict_cmd,
|
|||
|
check=True,
|
|||
|
capture_output=True,
|
|||
|
text=True,
|
|||
|
env=os.environ
|
|||
|
)
|
|||
|
|
|||
|
logger.info(f"模型推論輸出:\n{result.stdout}")
|
|||
|
|
|||
|
except subprocess.CalledProcessError as e:
|
|||
|
logger.error(f"模型推論失敗:\n輸出:{e.output}\n錯誤:{e.stderr}")
|
|||
|
raise RuntimeError(f"模型推論失敗:{e.stderr}")
|
|||
|
|
|||
|
if not os.path.exists(output_file):
|
|||
|
raise FileNotFoundError(f"找不到模型輸出檔案:{output_file}")
|
|||
|
|
|||
|
logger.info("開始執行影像後處理...")
|
|||
|
|
|||
|
try:
|
|||
|
# 讀取預測結果
|
|||
|
predicted_image = sitk.ReadImage(output_file)
|
|||
|
predicted_image = sitk.DICOMOrient(predicted_image)
|
|||
|
predicted_array = sitk.GetArrayFromImage(predicted_image)
|
|||
|
|
|||
|
# 執行後處理
|
|||
|
processed_array = process_symmetric_structures(predicted_array)
|
|||
|
processed_array = process_multiple_tumors(processed_array)
|
|||
|
|
|||
|
processed_image = sitk.GetImageFromArray(processed_array)
|
|||
|
processed_image.CopyInformation(predicted_image)
|
|||
|
|
|||
|
# 暫時保存後處理結果
|
|||
|
processed_output = os.path.join(OUTPUT_DIR, 'processed_' + os.path.basename(output_file))
|
|||
|
sitk.WriteImage(processed_image, processed_output)
|
|||
|
logger.info(f"後處理結果已保存至:{processed_output}")
|
|||
|
|
|||
|
logger.info("開始執行影像配準...")
|
|||
|
# reg_transform(NII_CT, NII_MR, processed_output, label_file)
|
|||
|
shutil.copy(processed_output, label_file)
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"影像處理失敗:{str(e)}")
|
|||
|
raise RuntimeError("後處理或配準失敗")
|
|||
|
|
|||
|
if not os.path.exists(label_file):
|
|||
|
raise FileNotFoundError(f"找不到配準後的標籤檔案:{label_file}")
|
|||
|
|
|||
|
logger.info("開始建立 RTSTRUCT...")
|
|||
|
|
|||
|
# 讀取 CT 影像序列
|
|||
|
try:
|
|||
|
reader = sitk.ImageSeriesReader()
|
|||
|
dicom_names = reader.GetGDCMSeriesFileNames(DCM_CT)
|
|||
|
reader.SetFileNames(dicom_names)
|
|||
|
reader.MetaDataDictionaryArrayUpdateOn()
|
|||
|
reader.LoadPrivateTagsOn()
|
|||
|
image = reader.Execute()
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"讀取 CT 系列失敗:{str(e)}")
|
|||
|
raise RuntimeError("無法讀取 CT 影像系列")
|
|||
|
|
|||
|
# 讀取並重新取樣配準後的預測結果
|
|||
|
try:
|
|||
|
final_image = sitk.ReadImage(label_file)
|
|||
|
final_image = sitk.Resample(
|
|||
|
final_image,
|
|||
|
image,
|
|||
|
sitk.Transform(),
|
|||
|
sitk.sitkNearestNeighbor
|
|||
|
)
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"重新取樣失敗:{str(e)}")
|
|||
|
raise RuntimeError("預測結果重新取樣失敗")
|
|||
|
|
|||
|
# 建立 RTSTRUCT
|
|||
|
try:
|
|||
|
|
|||
|
images = []
|
|||
|
images.append(IntensityImage(image, Modality('CT')))
|
|||
|
# images.append(IntensityImage(sitk.ReadImage(input_mr), Modality('MR')))
|
|||
|
|
|||
|
for k, v in LABEL_MAPPING.items():
|
|||
|
images.append(SegmentationImage(final_image==k, v))
|
|||
|
|
|||
|
dcm_crawler = ps_io.SubjectDicomCrawler(DCM_CT,
|
|||
|
modality_extractor=ExampleModalityExtractor(),
|
|||
|
)
|
|||
|
dicom_series_info = dcm_crawler.execute()
|
|||
|
|
|||
|
subject = Subject(dicom_series_info[0].get_patient_id(), images)
|
|||
|
|
|||
|
reference_modality = 'CT'
|
|||
|
|
|||
|
use_3d_conversion = True
|
|||
|
if use_3d_conversion:
|
|||
|
conv_conf = ps_io.RTSSConverter3DConfiguration()
|
|||
|
else:
|
|||
|
conv_conf = ps_io.RTSSConverter2DConfiguration()
|
|||
|
meta_data = ps_io.RTSSMetaData(
|
|||
|
series_description = '%s by onlylian'%type(conv_conf).__name__)
|
|||
|
|
|||
|
converter = ps_io.SubjectToRTSSConverter(
|
|||
|
subject,
|
|||
|
dicom_series_info,
|
|||
|
reference_modality,
|
|||
|
conv_conf,
|
|||
|
meta_data,
|
|||
|
# colors = tuple(LABEL_COLORS.values()),
|
|||
|
)
|
|||
|
|
|||
|
rtss = converter.convert()
|
|||
|
|
|||
|
|
|||
|
# Write the DICOM-RTSS to a separate subject directory
|
|||
|
# and include the DICOM files crawled before
|
|||
|
# Note: If you want to output just a subset of the
|
|||
|
# original DICOM files you may use additional selectors
|
|||
|
|
|||
|
output_dir_path = os.path.join(ROOT_DIR, 'pyradise')
|
|||
|
rtss_filename = os.path.basename(rtss_file)
|
|||
|
rtss_combination = ((rtss_filename, rtss),)
|
|||
|
print(output_dir_path)
|
|||
|
print(rtss_combination)
|
|||
|
|
|||
|
# shutil.rmtree(output_dir_path, ignore_errors=True)
|
|||
|
os.makedirs(output_dir_path, exist_ok=True)
|
|||
|
|
|||
|
writer = ps_io.DicomSeriesSubjectWriter()
|
|||
|
writer.write(rtss_combination,
|
|||
|
output_dir_path,
|
|||
|
# subject.get_name(),
|
|||
|
None,
|
|||
|
dicom_series_info,)
|
|||
|
|
|||
|
shutil.copy(os.path.join(output_dir_path, rtss_filename), rtss_file)
|
|||
|
|
|||
|
# rtstruct = RTStructBuilder.create_new(dicom_series_path=DCM_CT)
|
|||
|
|
|||
|
# # 為每個解剖結構創建遮罩
|
|||
|
# structure_masks = create_structure_masks(final_image)
|
|||
|
|
|||
|
# # 確認是否有找到任何結構
|
|||
|
# if not structure_masks:
|
|||
|
# logger.warning("未找到任何有效的結構遮罩")
|
|||
|
|
|||
|
# # 添加每個結構到 RTSTRUCT
|
|||
|
# for structure_name, mask in structure_masks.items():
|
|||
|
# if mask.any(): # 確保遮罩不是空的
|
|||
|
# logger.info(f"添加結構 {structure_name}")
|
|||
|
# try:
|
|||
|
# rtstruct.add_roi(
|
|||
|
# mask=mask,
|
|||
|
# color=LABEL_COLORS[structure_name],
|
|||
|
# name=structure_name
|
|||
|
# )
|
|||
|
# logger.info(f"成功添加 {structure_name}")
|
|||
|
# except Exception as roi_error:
|
|||
|
# logger.error(f"添加 ROI {structure_name} 時發生錯誤: {str(roi_error)}")
|
|||
|
# continue
|
|||
|
|
|||
|
# logger.info(f"儲存 RTSTRUCT:{rtss_file}")
|
|||
|
# rtstruct.save(rtss_file)
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"RTSTRUCT 生成失敗:{str(e)}")
|
|||
|
raise RuntimeError("無法生成或儲存 RTSTRUCT")
|
|||
|
|
|||
|
return rtss_file
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"推論過程發生錯誤:{str(e)}", exc_info=True)
|
|||
|
raise
|
|||
|
|
|||
|
def SendDCM(fp):
|
|||
|
"""傳送 DICOM 檔案到 PACS"""
|
|||
|
logger.info(f"準備傳送 DICOM 檔案:{fp}")
|
|||
|
debug_logger()
|
|||
|
|
|||
|
if not os.path.exists(fp):
|
|||
|
raise FileNotFoundError(f"找不到 DICOM 檔案:{fp}")
|
|||
|
|
|||
|
try:
|
|||
|
ae = AE()
|
|||
|
ae.ae_title = 'OUR_STORE_SCP'
|
|||
|
ae.add_requested_context(RTStructureSetStorage)
|
|||
|
|
|||
|
try:
|
|||
|
ds = dcmread(fp)
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"讀取 DICOM 檔案失敗:{str(e)}")
|
|||
|
raise RuntimeError("無法讀取 DICOM 檔案")
|
|||
|
|
|||
|
logger.info("正在連接 PACS 伺服器...")
|
|||
|
try:
|
|||
|
assoc = ae.associate("172.16.40.36", 104, ae_title='N1000_STORAGE')
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"PACS 連接失敗:{str(e)}")
|
|||
|
raise RuntimeError("無法連接 PACS 伺服器")
|
|||
|
|
|||
|
if assoc.is_established:
|
|||
|
try:
|
|||
|
status = assoc.send_c_store(ds)
|
|||
|
if status:
|
|||
|
logger.info(f'DICOM 傳送成功,狀態碼: 0x{status.Status:04x}')
|
|||
|
else:
|
|||
|
raise RuntimeError('傳送失敗:連接逾時或收到無效回應')
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"DICOM 傳送失敗:{str(e)}")
|
|||
|
raise
|
|||
|
finally:
|
|||
|
assoc.release()
|
|||
|
else:
|
|||
|
raise RuntimeError('傳送失敗:無法建立連接')
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"DICOM 傳送過程發生錯誤:{str(e)}", exc_info=True)
|
|||
|
raise
|
|||
|
|
|||
|
def main():
|
|||
|
"""主程式進入點"""
|
|||
|
if len(sys.argv) < 3:
|
|||
|
print('使用方式:', sys.argv[0], '<CT_DICOM_路徑> <MR_DICOM_路徑>')
|
|||
|
print('範例: python oar.py /nn/3894670/20241111/CT/a /nn/3894670/20241111/MR/6')
|
|||
|
sys.exit(1)
|
|||
|
|
|||
|
logger.info('開始執行')
|
|||
|
logger.info('程式名稱: %s', sys.argv[0])
|
|||
|
logger.info('CT路徑: %s', sys.argv[1])
|
|||
|
logger.info('MR路徑: %s', sys.argv[2])
|
|||
|
|
|||
|
start_time = time.time()
|
|||
|
try:
|
|||
|
rtss_file = inference(sys.argv[1], sys.argv[2])
|
|||
|
SendDCM(rtss_file)
|
|||
|
logger.info(f"處理完成,總耗時:{time.time() - start_time:.2f} 秒")
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"處理過程發生錯誤:{str(e)}", exc_info=True)
|
|||
|
sys.exit(1)
|
|||
|
|
|||
|
if __name__ == '__main__':
|
|||
|
main()
|
|||
|
|