123/onlylian/oar2.py
2025-02-01 15:57:22 +08:00

707 lines
No EOL
27 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from pathlib import Path
from typing import Dict, List, Optional, Tuple
import os
import shutil
import subprocess
import sys
import time
import logging
import colorsys
from nipype.interfaces.dcm2nii import Dcm2niix
from pydicom import dcmread
from pynetdicom import AE, debug_logger
from pynetdicom.sop_class import CTImageStorage, RTStructureSetStorage
from rt_utils import RTStructBuilder
import numpy as np
import SimpleITK as sitk
from pyradise.data import (Subject, IntensityImage, SegmentationImage,
Modality, Organ, Annotator)
from pyradise.fileio.extraction import SimpleModalityExtractor
import pyradise.data as ps_data
import pyradise.fileio as ps_io
from registration.best_reg import reg_transform
class ExampleModalityExtractor(ps_io.ModalityExtractor):
def extract_from_dicom(self,
path: str
) -> Optional[ps_data.Modality]:
# Extract the necessary attributes from the DICOM file
tags = (ps_io.Tag((0x0008, 0x0060)), # Modality
ps_io.Tag((0x0008, 0x103e))) # Series Description
dataset_dict = self._load_dicom_attributes(tags, path)
# Identify the modality rule-based
modality = dataset_dict.get('Modality', {}).get('value', None)
series_desc = dataset_dict.get('Series Description', {}).get('value', '')
if modality == 'MR':
if 't1' in series_desc.lower():
return ps_data.Modality('T1')
elif 't2' in series_desc.lower():
return ps_data.Modality('T2')
else:
return None
elif modality == 'CT':
return ps_data.Modality('CT')
else:
return None
def extract_from_path(self,
path: str
) -> Optional[ps_data.Modality]:
if 'CT' in path:
return ps_data.Modality('CT')
# Identify the discrete image file's modality rule-based
filename = os.path.basename(path)
# Check if the image contains an img prefix
# (i.e., it is a intensity image)
if not filename.startswith('img'):
return None
# Check if the image contains a modality search string
if 'T1' in filename:
return ps_data.Modality('T1')
elif 'T2' in filename:
return ps_data.Modality('T2')
else:
return None
def generate_distinct_colors(n):
"""生成 n 個視覺上區分度高的顏色"""
colors = []
for i in range(n):
# 使用黃金比例來產生分散的色相值
hue = (i * 0.618033988749895) % 1
# 使用較高的飽和度和亮度以確保顏色明顯
saturation = 0.7 + (i % 3) * 0.1 # 0.7-0.9
value = 0.8 + (i % 2) * 0.1 # 0.8-0.9
# 轉換 HSV 到 RGB
rgb = colorsys.hsv_to_rgb(hue, saturation, value)
# 轉換到 0-255 範圍
color = [int(x * 255) for x in rgb]
colors.append(color)
return colors
# 設定顏色映射
LABEL_COLORS: Dict[str, List[int]] = {
"Brainstem": [0, 255, 0], # 綠色
"Right Eye": [255, 255, 224], # 淺黃色
"Left Eye": [255, 255, 224], # 淺黃色
"Optic Chiasm": [0, 0, 255], # 藍色
"Right_Optic_Nerve": [255, 127, 80], # 珊瑚紅
"Left_Optic_Nerve": [255, 127, 80] # 珊瑚紅
}
# 設定標籤映射
LABEL_MAPPING: Dict[int, str] = {
1: "Brainstem",
2: "Right Eye",
3: "Left Eye",
4: "Optic Chiasm",
5: "Right_Optic_Nerve",
6: "Left_Optic_Nerve"
}
# 設定日誌
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('medical_imaging.log')
]
)
logger = logging.getLogger(__name__)
def validate_dicom_directory(dicom_path):
"""加強版 DICOM 目錄驗證"""
try:
if not os.path.exists(dicom_path):
return False, f"目錄不存在:{dicom_path}"
if not os.listdir(dicom_path):
return False, f"目錄是空的:{dicom_path}"
dicom_files = []
non_dicom = []
corrupt_files = []
series_info = {}
for filename in os.listdir(dicom_path):
filepath = os.path.join(dicom_path, filename)
if os.path.isfile(filepath):
try:
ds = dcmread(filepath)
if hasattr(ds, 'SeriesInstanceUID'):
series_uid = ds.SeriesInstanceUID
if series_uid not in series_info:
series_info[series_uid] = []
series_info[series_uid].append(filepath)
dicom_files.append(filepath)
else:
non_dicom.append(filename)
except Exception as e:
logger.warning(f"無法讀取檔案 {filename}: {str(e)}")
if filename.endswith(('.dcm', '.DCM')):
corrupt_files.append(filename)
else:
non_dicom.append(filename)
if not dicom_files:
message = f"{dicom_path} 中找不到有效的 DICOM 檔案\n"
if non_dicom:
message += f"發現 {len(non_dicom)} 個非 DICOM 檔案\n"
if corrupt_files:
message += f"發現 {len(corrupt_files)} 個損壞的 DICOM 檔案"
return False, message
if len(series_info) > 1:
logger.warning(f"發現多個系列: {len(series_info)}")
for series_uid, files in series_info.items():
if len(files) < 5:
logger.warning(f"系列 {series_uid} 的影像數量過少: {len(files)}")
return True, f"找到 {len(dicom_files)} 個有效的 DICOM 檔案,分屬 {len(series_info)} 個系列"
except Exception as e:
return False, f"驗證過程發生錯誤:{str(e)}"
def enhanced_dcm2nii(source_dir, output_dir):
"""加強版的 dcm2nii 轉換函式"""
try:
is_valid, message = validate_dicom_directory(source_dir)
if not is_valid:
return False, message, []
os.makedirs(output_dir, exist_ok=True)
initial_files = set(os.listdir(output_dir))
converter = Dcm2niix()
converter.inputs.source_dir = source_dir
converter.inputs.output_dir = output_dir
converter.inputs.compress = 'y'
converter.inputs.merge_imgs = True
converter.inputs.single_file = True
converter.inputs.verbose = True
converter.terminal_output = 'stream'
logger.info(f"執行 DICOM 轉換:{source_dir}")
try:
result = converter.run()
if hasattr(result, 'runtime'):
if hasattr(result.runtime, 'stdout'):
logger.info(f"dcm2niix 輸出:\n{result.runtime.stdout}")
if hasattr(result.runtime, 'stderr'):
logger.warning(f"dcm2niix 錯誤:\n{result.runtime.stderr}")
except Exception as run_error:
logger.error(f"dcm2niix 執行錯誤:{str(run_error)}")
final_files = set(os.listdir(output_dir))
new_files = final_files - initial_files
nifti_files = [f for f in new_files if f.endswith(('.nii.gz', '.nii'))]
if not nifti_files:
logger.warning("使用備用轉換選項重試...")
converter.inputs.merge_imgs = False
converter.inputs.single_file = False
try:
result = converter.run()
except Exception as retry_error:
logger.error(f"備用轉換選項執行錯誤:{str(retry_error)}")
final_files = set(os.listdir(output_dir))
new_files = final_files - initial_files
nifti_files = [f for f in new_files if f.endswith(('.nii.gz', '.nii'))]
if not nifti_files:
return False, "轉換完成但未產生 NIfTI 檔案", []
valid_nifti = []
for nii_file in nifti_files:
try:
img = sitk.ReadImage(os.path.join(output_dir, nii_file))
if img.GetSize()[2] > 1:
valid_nifti.append(nii_file)
else:
logger.warning(f"檔案 {nii_file} 不是有效的3D影像")
except Exception as e:
logger.warning(f"無法讀取 NIfTI 檔案 {nii_file}: {str(e)}")
if not valid_nifti:
return False, "轉換產生的 NIfTI 檔案無效", []
return True, f"成功轉換 {len(valid_nifti)} 個有效的 NIfTI 檔案", valid_nifti
except Exception as e:
logger.error(f"DICOM 轉換錯誤:{str(e)}", exc_info=True)
return False, f"轉換過程發生錯誤:{str(e)}", []
def process_symmetric_structures(image_array):
"""處理對稱結構的後處理函數"""
logger.info("開始處理對稱結構")
result_img = image_array.copy()
# 定義需要處理的結構及其標籤
structures = [
{'name': '眼睛', 'labels': (2, 3), 'kernel_radius': 2},
{'name': '視神經', 'labels': (5, 6), 'kernel_radius': 2}
]
# 處理每個對稱結構
for structure in structures:
logger.info(f"處理{structure['name']}結構")
# 合併左右標籤
combined_mask = np.zeros_like(result_img, dtype=np.uint8)
for label in structure['labels']:
combined_mask |= (result_img == label)
# 形態學閉運算
combined_mask_sitk = sitk.GetImageFromArray(combined_mask)
closing_filter = sitk.BinaryMorphologicalClosingImageFilter()
closing_filter.SetKernelRadius(structure['kernel_radius'])
processed_mask = sitk.GetArrayFromImage(closing_filter.Execute(combined_mask_sitk))
# 連通區域標記
labeled_components = sitk.GetArrayFromImage(
sitk.ConnectedComponent(sitk.GetImageFromArray(processed_mask))
)
# 找出最大的兩個區域
unique, counts = np.unique(labeled_components, return_counts=True)
if len(unique) >= 3: # 確保至少有兩個區域(除了背景)
region_sizes = [(i, count) for i, count in zip(unique[1:], counts[1:])]
sorted_regions = sorted(region_sizes, key=lambda x: x[1], reverse=True)
# 獲取最大的兩個區域
region1 = (labeled_components == sorted_regions[0][0])
region2 = (labeled_components == sorted_regions[1][0])
# 根據質心的x座標決定左右
center1 = np.mean(np.where(region1)[2])
center2 = np.mean(np.where(region2)[2])
# 分配左右標籤
if center1 < center2:
right, left = region1, region2
else:
right, left = region2, region1
# 更新結果
result_img[right] = structure['labels'][0] # 右側標籤
result_img[left] = structure['labels'][1] # 左側標籤
logger.info(f"完成{structure['name']}的左右結構處理")
else:
logger.warning(f"無法找到足夠的{structure['name']}區域進行處理")
return result_img
def process_multiple_tumors(image_array):
"""處理多個腫瘤的函數"""
logger.info("開始處理多個腫瘤")
result_img = image_array.copy()
# 假設原始標記中 7 代表腫瘤
tumor_mask = (result_img == 7)
if not tumor_mask.any():
logger.warning("未找到腫瘤區域")
return result_img
# 使用連通區域分析找出不同的腫瘤
tumor_mask_sitk = sitk.GetImageFromArray(tumor_mask.astype(np.uint8))
connected_filter = sitk.ConnectedComponentImageFilter()
connected_filter.FullyConnectedOn()
labeled_tumors = sitk.GetArrayFromImage(connected_filter.Execute(tumor_mask_sitk))
num_tumors = connected_filter.GetObjectCount()
logger.info(f"找到 {num_tumors} 個腫瘤")
if num_tumors == 0:
return result_img
# 為每個腫瘤生成唯一的標籤值,從最大現有標籤值開始
max_existing_label = max(LABEL_MAPPING.keys())
tumor_colors = generate_distinct_colors(num_tumors)
# 更新標籤映射和顏色映射
for i in range(num_tumors):
tumor_label = max_existing_label + i + 1
tumor_name = f"TV_{i+1}"
LABEL_MAPPING[tumor_label] = tumor_name
LABEL_COLORS[tumor_name] = tumor_colors[i]
# 更新結果圖像中的腫瘤標籤
result_img[labeled_tumors == (i + 1)] = tumor_label
logger.info(f"標記腫瘤 {tumor_name} 使用標籤 {tumor_label} 和顏色 {tumor_colors[i]}")
return result_img
def create_structure_masks(image: sitk.Image) -> Dict[str, np.ndarray]:
"""從分割結果創建每個結構的遮罩"""
array = sitk.GetArrayFromImage(image)
masks = {}
for label_id, structure_name in LABEL_MAPPING.items():
mask = (array == label_id)
if mask.any():
masks[structure_name] = np.transpose(mask, (1, 2, 0))
return masks
def inference(DCM_CT, DCM_MR):
"""執行影像推論和後處理"""
logger.info(f"處理影像\nCT: {DCM_CT}\nMR: {DCM_MR}")
try:
# 驗證# 驗證輸入目錄
for path, desc in [(DCM_CT, "CT"), (DCM_MR, "MR")]:
is_valid, message = validate_dicom_directory(path)
if not is_valid:
raise ValueError(f"{desc} 影像驗證失敗:{message}")
# 設定工作目錄
ROOT_DIR = os.path.dirname(os.path.dirname(DCM_CT))
NII_DIR = os.path.join(ROOT_DIR, 'nii')
INPUT_DIR = os.path.join(ROOT_DIR, 'input')
OUTPUT_DIR = os.path.join(ROOT_DIR, 'output')
# 設定 RTSTRUCT 輸出路徑
head, tail = os.path.split(DCM_CT)
rtss_file = os.path.join(head, tail+'-rtss.dcm')
# 清理並建立工作目錄
for dir_path in [NII_DIR, INPUT_DIR, OUTPUT_DIR]:
shutil.rmtree(dir_path, ignore_errors=True)
os.makedirs(dir_path, exist_ok=True)
# 取得基本檔名
nCT = os.path.basename(DCM_CT)
nMR = os.path.basename(DCM_MR)
# DICOM 轉 NIfTI
logger.info("開始轉換 CT DICOM...")
success_ct, message_ct, ct_files = enhanced_dcm2nii(DCM_CT, NII_DIR)
if not success_ct:
raise RuntimeError(f"CT 轉換失敗:{message_ct}")
logger.info("開始轉換 MR DICOM...")
success_mr, message_mr, mr_files = enhanced_dcm2nii(DCM_MR, NII_DIR)
if not success_mr:
raise RuntimeError(f"MR 轉換失敗:{message_mr}")
# 尋找轉換後的檔案
NII_CT = None
NII_MR = None
for f in os.listdir(NII_DIR):
if f.endswith('.nii.gz'):
full_path = os.path.join(NII_DIR, f)
if f.startswith(nCT+'_'):
NII_CT = full_path
logger.info(f"找到 CT NIfTI 檔案:{f}")
elif f.startswith(nMR+'_'):
NII_MR = full_path
logger.info(f"找到 MR NIfTI 檔案:{f}")
if not NII_CT:
raise FileNotFoundError(f"找不到 CT 的 NIfTI 檔案,目錄內容:{os.listdir(NII_DIR)}")
if not NII_MR:
raise FileNotFoundError(f"找不到 MR 的 NIfTI 檔案,目錄內容:{os.listdir(NII_DIR)}")
# 準備輸入檔案
basename = os.path.basename(NII_MR)
old = '_'+basename.split('_')[-1]
input_file = os.path.join(INPUT_DIR, basename.replace(old, '_0000.nii.gz'))
output_file = os.path.join(OUTPUT_DIR, basename.replace(old, '.nii.gz'))
basename_ct = os.path.basename(NII_CT)
old_ct = '_'+basename_ct.split('_')[-1]
label_file = os.path.join(NII_DIR, basename_ct.replace(old_ct, '.label.nii.gz'))
logger.info(f"複製 MR NIfTI 到輸入目錄:{input_file}")
shutil.copy(NII_MR, input_file)
logger.info("準備執行模型推論...")
# 設定 nnUNet 環境變數
model_dir = "/123/onlylian"
os.environ['nnUNet_raw'] = "/123/onlylian/nnUNet_raw"
os.environ['nnUNet_preprocessed'] = "/123/onlylian/nnUNet_preprocessed"
os.environ['nnUNet_results'] = "/123/onlylian/nnUNet_results"
# 明確設定可用的資料集路徑
dataset_dir = os.path.join(model_dir, "nnUNet_results", "Dataset505")
logger.info(f"使用資料集目錄:{dataset_dir}")
if not os.path.exists(dataset_dir):
logger.error(f"資料集目錄不存在:{dataset_dir}")
raise FileNotFoundError(f"找不到資料集目錄:{dataset_dir}")
try:
# 執行模型推論
predict_cmd = [
"nnUNetv2_predict",
"-i", INPUT_DIR,
"-o", OUTPUT_DIR,
"-d", "505",
"-c", "3d_fullres",
"-f", "0",
"-tr", "nnUNetTrainer",
"-p", "nnUNetPlans"
]
logger.info(f"執行推論指令:{' '.join(predict_cmd)}")
result = subprocess.run(
predict_cmd,
check=True,
capture_output=True,
text=True,
env=os.environ
)
logger.info(f"模型推論輸出:\n{result.stdout}")
except subprocess.CalledProcessError as e:
logger.error(f"模型推論失敗:\n輸出:{e.output}\n錯誤:{e.stderr}")
raise RuntimeError(f"模型推論失敗:{e.stderr}")
if not os.path.exists(output_file):
raise FileNotFoundError(f"找不到模型輸出檔案:{output_file}")
logger.info("開始執行影像後處理...")
try:
# 讀取預測結果
predicted_image = sitk.ReadImage(output_file)
predicted_image = sitk.DICOMOrient(predicted_image)
predicted_array = sitk.GetArrayFromImage(predicted_image)
# 執行後處理
processed_array = process_symmetric_structures(predicted_array)
processed_array = process_multiple_tumors(processed_array)
processed_image = sitk.GetImageFromArray(processed_array)
processed_image.CopyInformation(predicted_image)
# 暫時保存後處理結果
processed_output = os.path.join(OUTPUT_DIR, 'processed_' + os.path.basename(output_file))
sitk.WriteImage(processed_image, processed_output)
logger.info(f"後處理結果已保存至:{processed_output}")
logger.info("開始執行影像配準...")
reg_transform(NII_CT, NII_MR, processed_output, label_file)
except Exception as e:
logger.error(f"影像處理失敗:{str(e)}")
raise RuntimeError("後處理或配準失敗")
if not os.path.exists(label_file):
raise FileNotFoundError(f"找不到配準後的標籤檔案:{label_file}")
logger.info("開始建立 RTSTRUCT...")
# 讀取 CT 影像序列
try:
reader = sitk.ImageSeriesReader()
dicom_names = reader.GetGDCMSeriesFileNames(DCM_CT)
reader.SetFileNames(dicom_names)
reader.MetaDataDictionaryArrayUpdateOn()
reader.LoadPrivateTagsOn()
image = reader.Execute()
except Exception as e:
logger.error(f"讀取 CT 系列失敗:{str(e)}")
raise RuntimeError("無法讀取 CT 影像系列")
# 讀取並重新取樣配準後的預測結果
try:
final_image = sitk.ReadImage(label_file)
final_image = sitk.Resample(
final_image,
image,
sitk.Transform(),
sitk.sitkNearestNeighbor
)
except Exception as e:
logger.error(f"重新取樣失敗:{str(e)}")
raise RuntimeError("預測結果重新取樣失敗")
# 建立 RTSTRUCT
try:
images = []
images.append(IntensityImage(image, Modality('CT')))
for k, v in LABEL_MAPPING.items():
images.append(SegmentationImage(final_image==k, v))
dcm_crawler = ps_io.SubjectDicomCrawler(DCM_CT,
modality_extractor=ExampleModalityExtractor(),
)
dicom_series_info = dcm_crawler.execute()
subject = Subject(dicom_series_info[0].get_patient_id(), images)
reference_modality = 'CT'
use_3d_conversion = False
if use_3d_conversion:
conv_conf = ps_io.RTSSConverter3DConfiguration()
else:
conv_conf = ps_io.RTSSConverter2DConfiguration()
meta_data = ps_io.RTSSMetaData(
series_description = '%s by onlylian'%type(conv_conf).__name__)
converter = ps_io.SubjectToRTSSConverter(
subject,
dicom_series_info,
reference_modality,
conv_conf,
meta_data,
# colors = tuple(LABEL_COLORS.values()),
)
rtss = converter.convert()
# Write the DICOM-RTSS to a separate subject directory
# and include the DICOM files crawled before
# Note: If you want to output just a subset of the
# original DICOM files you may use additional selectors
output_dir_path = os.path.join(ROOT_DIR, 'pyradise')
rtss_filename = os.path.basename(rtss_file)
rtss_combination = ((rtss_filename, rtss),)
print(output_dir_path)
print(rtss_combination)
# shutil.rmtree(output_dir_path, ignore_errors=True)
os.makedirs(output_dir_path, exist_ok=True)
writer = ps_io.DicomSeriesSubjectWriter()
writer.write(rtss_combination,
output_dir_path,
# subject.get_name(),
None,
dicom_series_info,)
shutil.copy(os.path.join(output_dir_path, rtss_filename), rtss_file)
# rtstruct = RTStructBuilder.create_new(dicom_series_path=DCM_CT)
# # 為每個解剖結構創建遮罩
# structure_masks = create_structure_masks(final_image)
# # 確認是否有找到任何結構
# if not structure_masks:
# logger.warning("未找到任何有效的結構遮罩")
# # 添加每個結構到 RTSTRUCT
# for structure_name, mask in structure_masks.items():
# if mask.any(): # 確保遮罩不是空的
# logger.info(f"添加結構 {structure_name}")
# try:
# rtstruct.add_roi(
# mask=mask,
# color=LABEL_COLORS[structure_name],
# name=structure_name
# )
# logger.info(f"成功添加 {structure_name}")
# except Exception as roi_error:
# logger.error(f"添加 ROI {structure_name} 時發生錯誤: {str(roi_error)}")
# continue
# logger.info(f"儲存 RTSTRUCT{rtss_file}")
# rtstruct.save(rtss_file)
except Exception as e:
logger.error(f"RTSTRUCT 生成失敗:{str(e)}")
raise RuntimeError("無法生成或儲存 RTSTRUCT")
return rtss_file
except Exception as e:
logger.error(f"推論過程發生錯誤:{str(e)}", exc_info=True)
raise
def SendDCM(fp):
"""傳送 DICOM 檔案到 PACS"""
logger.info(f"準備傳送 DICOM 檔案:{fp}")
debug_logger()
if not os.path.exists(fp):
raise FileNotFoundError(f"找不到 DICOM 檔案:{fp}")
try:
ae = AE()
ae.ae_title = 'OUR_STORE_SCP'
ae.add_requested_context(RTStructureSetStorage)
try:
ds = dcmread(fp)
except Exception as e:
logger.error(f"讀取 DICOM 檔案失敗:{str(e)}")
raise RuntimeError("無法讀取 DICOM 檔案")
logger.info("正在連接 PACS 伺服器...")
try:
assoc = ae.associate("172.16.40.36", 104, ae_title='N1000_STORAGE')
except Exception as e:
logger.error(f"PACS 連接失敗:{str(e)}")
raise RuntimeError("無法連接 PACS 伺服器")
if assoc.is_established:
try:
status = assoc.send_c_store(ds)
if status:
logger.info(f'DICOM 傳送成功,狀態碼: 0x{status.Status:04x}')
else:
raise RuntimeError('傳送失敗:連接逾時或收到無效回應')
except Exception as e:
logger.error(f"DICOM 傳送失敗:{str(e)}")
raise
finally:
assoc.release()
else:
raise RuntimeError('傳送失敗:無法建立連接')
except Exception as e:
logger.error(f"DICOM 傳送過程發生錯誤:{str(e)}", exc_info=True)
raise
def main():
"""主程式進入點"""
if len(sys.argv) < 3:
print('使用方式:', sys.argv[0], '<CT_DICOM_路徑> <MR_DICOM_路徑>')
print('範例: python oar.py /nn/3894670/20241111/CT/a /nn/3894670/20241111/MR/6')
sys.exit(1)
logger.info('開始執行')
logger.info('程式名稱: %s', sys.argv[0])
logger.info('CT路徑: %s', sys.argv[1])
logger.info('MR路徑: %s', sys.argv[2])
start_time = time.time()
try:
rtss_file = inference(sys.argv[1], sys.argv[2])
SendDCM(rtss_file)
logger.info(f"處理完成,總耗時:{time.time() - start_time:.2f}")
except Exception as e:
logger.error(f"處理過程發生錯誤:{str(e)}", exc_info=True)
sys.exit(1)
if __name__ == '__main__':
main()