diff --git a/.gitignore b/.gitignore index 0e80e9a..0f34a57 100644 --- a/.gitignore +++ b/.gitignore @@ -164,4 +164,5 @@ cython_debug/ # xfr *.xlsx -*.local.txt \ No newline at end of file +*.local.txt +*.nii.gz \ No newline at end of file diff --git a/src/pacs.py b/src/pacs.py index 37aad5d..7cc59a7 100644 --- a/src/pacs.py +++ b/src/pacs.py @@ -50,7 +50,8 @@ def login(id=USER_ID, pw=PASSWORD): app = Application(backend="uia").start(r'C:\Users\xfr\Desktop\PACS.exe') w = Desktop(backend="uia").window(title_re='.* - Microsoft​ Edge$') - w.wait('ready') + w.wait('ready', timeout=30) + # time.sleep(9) @@ -88,7 +89,10 @@ def save_tag(window, outdir): TDicomTagListForm = window['DICOM 標籤清單Dialog'] if TDicomTagListForm.exists(): - TDicomTagListForm.close() + try: + TDicomTagListForm.close() + except Exception as e: + logging.critical(e, exc_info=True) # log exception info at CRITICAL log level TDicomTagListForm.wait_not('exists', 30) TToolBar = window.child_window(class_name='TToolBar', found_index=2) @@ -110,7 +114,7 @@ def save_tag(window, outdir): NEdit.wait('ready', 99) edit = NEdit.wrapper_object() pahtname = os.path.join(outdir, edit.get_value()) - for p in glob.glob(pahtname + '*'): + for p in glob.glob(pahtname + '.*'): os.remove(p) edit.set_text(pahtname) dlg_save['存檔(S)Button'].wrapper_object().click() @@ -156,7 +160,7 @@ def save_study(chartno, outdir, only_tag): c = series[0] c.set_focus() mouse_click(c) - keyboard.send_keys('^i') + keyboard.send_keys('^i') #隱藏標籤 # logging.warning(str(TFlowPanel.get_properties())) # print(str(TFlowPanel.get_properties())) @@ -168,6 +172,7 @@ def save_study(chartno, outdir, only_tag): logging.warning(str(c.element_info)) # c.wait('ready') #AttributeError: 'UIAWrapper' object has no attribute 'wait' + keyboard.send_keys('{ESC}') c.set_focus() mouse_click(c) @@ -338,10 +343,11 @@ def save_patient(chartno, outdir, query = "CT,MR", only_tag=False): time.sleep(.1) + # exit() + logging.warning(chartno+" completed "+query) return 0 - # exit() actionlogger.enable() @@ -362,10 +368,12 @@ def main(): # logger.handlers[0].setFormatter(formatter) app, window = login() - save_patient('5344141', r'T:\0\5344141', query="CT,MR") + # save_patient('5344141', r'T:\0\5344141', query="CT,MR") # save_patient('5344141', r'T:\0\5344141', query="CT") # save_patient('2380784', r'T:\3\2380784', query=False) # save_study('2380784', outdir=r'T:\0\2380784') + # save_patient('6631848', r'T:\0\6631848', query='MR') + save_patient('6166521', r'T:\0\6166521', query='CT,MR') diff --git a/src/uni2nii.py b/src/uni2nii.py new file mode 100644 index 0000000..0788f49 --- /dev/null +++ b/src/uni2nii.py @@ -0,0 +1,481 @@ + +import csv +import itertools +import os +import re + +import struct + +import numpy as np +import SimpleITK as sitk + + +def get_image_size_imghdr(fname): + try: + with open(fname, 'rb') as fhandle: + head = fhandle.read(24) + if len(head) != 24: + return None + # if imghdr.what(fname) == 'jpeg': + if True: + try: + fhandle.seek(0) + size = 2 + ftype = 0 + while not 0xc0 <= ftype <= 0xcf: + fhandle.seek(size, 1) + byte = fhandle.read(1) + while ord(byte) == 0xff: + byte = fhandle.read(1) + ftype = ord(byte) + size = struct.unpack('>H', fhandle.read(2))[0] - 2 + fhandle.seek(1, 1) + height, width = struct.unpack('>HH', fhandle.read(4)) + return width, height + except Exception: + return None + except FileNotFoundError: + return "File not found" + except Exception as e: + return f"Error: {e}" + + +class Series: + def __init__(self, root): + self.root = root + self.jpgs = {} + self.txts = [] + self.headers = {} + + def read_csv(file): + r = {} + with open(file, encoding="utf-8") as csvfile: + rows = csv.reader(csvfile) + + for row in rows: + # print(row) + if len(row)>4: + r[row[4]] = row[3] + + return r + + + def sanitize_filename(filename: str, replacement: str = "_") -> str: + """ + Remove invalid characters from a filename. + + Parameters: + - filename: The original filename + - replacement: Character to replace invalid ones with (default is "_") + + Returns: + - A sanitized version of the filename + """ + # Define a regex pattern of invalid filename characters + # Windows invalid characters: \ / : * ? " < > | + invalid_chars = r'[\\/:*?"<>|]' + sanitized = re.sub(invalid_chars, replacement, filename) + + # Optionally, strip leading/trailing whitespace or dots + sanitized = sanitized.strip().strip(".") + + return sanitized + + def add_txt(self, root, name): + # print(name) + p = os.path.join(root, name) + self.txts.append(p) + ins = int(name.split('_')[4][:-4]) + r = Series.read_csv(p) + self.headers[ins] = r + # print(r['Image Position (Patient)']) + # print(r['Image Orientation (Patient)']) + # print(r) + # exit() + + + + + def add_jpg(self, root, name): + ins = int(name.split('_')[4]) + p = os.path.join(root, name) + size = get_image_size_imghdr(p) + if size not in self.jpgs: + self.jpgs[size] = {} + # print(size, p) + self.jpgs[size][ins] = p + + def write_pairs(self, pair, value, outdir): + a, b = pair + norm1, size1 = value + + # print(size1) + + # print(self.jpgs.keys()) + # k, v = sorted(self.jpgs.items(), key=lambda item: -len(item[1]))[0] + # print(k) + + v = self.jpgs[size1] + fileNames = [] + for ins in sorted(v.keys()): + # print(get_image_size_imghdr(v[ins])) + fileNames.append(v[ins]) + + # fileNames = [] + # for ins in sorted(self.jpgs.keys()): + # print(get_image_size_imghdr(self.jpgs[ins])) + # fileNames.append(self.jpgs[ins]) + + image = sitk.ReadImage(fileNames) + + i0 = sitk.VectorIndexSelectionCast(image, 0) + i1 = sitk.VectorIndexSelectionCast(image, 1) + i2 = sitk.VectorIndexSelectionCast(image, 2) + + image = i0 + + # print(image) + + # keys = sorted(self.headers.keys()) + # a = keys[0] + # b = keys[-1] + # print(self.headers[a]['Image Position (Patient)']) + # print(self.headers[a]['Image Orientation (Patient)']) + # print(self.headers[b]['Image Position (Patient)']) + # print(self.headers[b]['Image Orientation (Patient)']) + + + # if "Image Orientation (Patient)" not in self.headers[a]: + # print('skip due to no "Image Orientation (Patient)"') + # return + + Orientation = np.array([float(p) for p in self.headers[a]['Image Orientation (Patient)'].split('\\')]) + Spacing = np.array([float(p) for p in self.headers[a]['Pixel Spacing'].split('\\')]) + + p1 = np.array([float(p) for p in self.headers[a]['Image Position (Patient)'].split('\\')]) + p2 = np.array([float(p) for p in self.headers[b]['Image Position (Patient)'].split('\\')]) + + diff_pos = (p2-p1)/(b-a) + norm = np.linalg.norm(diff_pos) + # print(p1) + # print(p2) + # print(norm) + + if norm < 0.1: + print('skip due to small norm', norm) + return + + unit_vec = diff_pos / norm + + # print(unit_vec) + # print(Orientation) + + o0 = Orientation[:3] + o1 = Orientation[3:] + # print(np.cross(o0, o1)) + + d = np.concatenate([Orientation, unit_vec]) + # d = np.concatenate([o1, o0, unit_vec]) + # d = np.concatenate([o1, unit_vec, o0]) + d = np.linalg.inv(d.reshape(3,3)).flatten() + s = np.append(Spacing, norm) + + image.SetDirection(d) + image.SetOrigin(p1) + image.SetSpacing(s) + + + # print(image) + # print(d) + # print(s) + + date = self.headers[a]["Study Date"].strip() + + name = '%s-%s-%s.nii.gz' % (date, self.headers[a]['Series Number'].strip(), self.headers[a]['Series Description'].strip()) + sitk.WriteImage(image, os.path.join(outdir, Series.sanitize_filename(name))) + + + def write_single(self, a, outdir): + fileNames = [] + for ins in sorted(self.jpgs.keys()): + fileNames.append(self.jpgs[ins]) + + image = sitk.ReadImage(fileNames) + + i0 = sitk.VectorIndexSelectionCast(image, 0) + i1 = sitk.VectorIndexSelectionCast(image, 1) + i2 = sitk.VectorIndexSelectionCast(image, 2) + + image = i0 + + Orientation = np.array([float(p) for p in self.headers[a]['Image Orientation (Patient)'].split('\\')]) + Spacing = np.array([float(p) for p in self.headers[a]['Pixel Spacing'].split('\\')]) + + o0 = Orientation[:3] + o1 = Orientation[3:] + unit_vec = np.cross(o0, o1) + + d = np.concatenate([Orientation, unit_vec]) + d = np.linalg.inv(d.reshape(3,3)).flatten() + + print(Orientation) + print(d) + + p1 = np.array([float(p) for p in self.headers[a]['Image Position (Patient)'].split('\\')]) + + norm = float(self.headers[a]["Slice Thickness"]) + s = np.append(Spacing, norm) + + image.SetDirection(d) + image.SetOrigin(p1) + image.SetSpacing(s) + + + # print(image) + # print(d) + # print(s) + + name = '%s-%d-%s.nii.gz' % (self.headers[a]['Series Number'].strip(), a, self.headers[a]['Series Description'].strip()) + # print(name) + sitk.WriteImage(image, os.path.join(outdir, Series.sanitize_filename(name))) + + return + + exit() + + + + + p1 = np.array([float(p) for p in self.headers[a]['Image Position (Patient)'].split('\\')]) + p2 = np.array([float(p) for p in self.headers[b]['Image Position (Patient)'].split('\\')]) + + diff_pos = (p2-p1)/(b-a) + norm = np.linalg.norm(diff_pos) + print(p1) + print(p2) + print(norm) + + if norm < 0.1: + print('skip due to small norm', norm) + return + + unit_vec = diff_pos / norm + + print(unit_vec) + print(Orientation) + + o0 = Orientation[:3] + o1 = Orientation[3:] + # print(np.cross(o0, o1)) + + d = np.concatenate([Orientation, unit_vec]) + # d = np.concatenate([o1, o0, unit_vec]) + # d = np.concatenate([o1, unit_vec, o0]) + d = np.linalg.inv(d.reshape(3,3)).flatten() + s = np.append(Spacing, norm) + + image.SetDirection(d) + image.SetOrigin(p1) + image.SetSpacing(s) + + + # print(image) + # print(d) + # print(s) + + name = '%s-%s.nii.gz' % (self.headers[a]['Series Number'].strip(), self.headers[a]['Series Description'].strip()) + sitk.WriteImage(image, os.path.join(outdir, Series.sanitize_filename(name))) + + + + def write(self, outdir): + + # if len(self.jpgs)<9: + # return + + valid_keys = [] + + pairs = {} + + + for k in sorted(self.headers.keys()): + header = self.headers[k] + if "Image Orientation (Patient)" not in header: + continue + valid_keys.append(k) + + # print(valid_keys) + # exit() + + if not valid_keys: + return + + a = valid_keys[0] + name = '%s-%s.nii.gz' % (self.headers[a]['Series Number'].strip(), self.headers[a]['Series Description'].strip()) + # print(name, valid_keys) + + if len(valid_keys) > 1: + for c in itertools.combinations(valid_keys, 2): + a, b = c + + size1 = int(self.headers[a]["Columns"]),int(self.headers[a]["Rows"]) + size2 = int(self.headers[b]["Columns"]),int(self.headers[b]["Rows"]) + if size1 != size2: + continue + + o1 = np.array([float(o) for o in self.headers[a]["Image Orientation (Patient)"].split('\\')]) + cr1 = np.cross(o1[:3], o1[3:]) + o2 = np.array([float(o) for o in self.headers[b]["Image Orientation (Patient)"].split('\\')]) + cr2 = np.cross(o2[:3], o2[3:]) + dot = np.dot(cr1, cr2) + if dot < -0.9: + pass + elif dot > 0.9: + p1 = np.array([float(o) for o in self.headers[a]["Image Position (Patient)"].split('\\')]) + p2 = np.array([float(o) for o in self.headers[b]["Image Position (Patient)"].split('\\')]) + pairs[c] = np.linalg.norm(p2-p1), size1 + # else: + # print(a, o1) + # print(b, o2) + # print(c, dot) + + if pairs: + for k, v in pairs.items(): + # continue + print(name, k, v) + self.write_pairs(k, v, outdir) + break + # else: + # for k in valid_keys: + # self.write_single(k, outdir) + + return + + fileNames = [] + for ins in sorted(self.jpgs.keys()): + fileNames.append(self.jpgs[ins]) + + image = sitk.ReadImage(fileNames) + + i0 = sitk.VectorIndexSelectionCast(image, 0) + i1 = sitk.VectorIndexSelectionCast(image, 1) + i2 = sitk.VectorIndexSelectionCast(image, 2) + + image = i0 + + # print(image) + + keys = sorted(self.headers.keys()) + a = keys[0] + b = keys[-1] + # print(self.headers[a]['Image Position (Patient)']) + # print(self.headers[a]['Image Orientation (Patient)']) + # print(self.headers[b]['Image Position (Patient)']) + # print(self.headers[b]['Image Orientation (Patient)']) + + + if "Image Orientation (Patient)" not in self.headers[a]: + print('skip due to no "Image Orientation (Patient)"') + return + + Orientation = np.array([float(p) for p in self.headers[a]['Image Orientation (Patient)'].split('\\')]) + Spacing = np.array([float(p) for p in self.headers[a]['Pixel Spacing'].split('\\')]) + + p1 = np.array([float(p) for p in self.headers[a]['Image Position (Patient)'].split('\\')]) + p2 = np.array([float(p) for p in self.headers[b]['Image Position (Patient)'].split('\\')]) + + diff_pos = (p2-p1)/(b-a) + norm = np.linalg.norm(diff_pos) + print(p1) + print(p2) + print(norm) + + if norm < 0.1: + print('skip due to small norm', norm) + return + + unit_vec = diff_pos / norm + + print(unit_vec) + print(Orientation) + + o0 = Orientation[:3] + o1 = Orientation[3:] + # print(np.cross(o0, o1)) + + d = np.concatenate([Orientation, unit_vec]) + # d = np.concatenate([o1, o0, unit_vec]) + # d = np.concatenate([o1, unit_vec, o0]) + d = np.linalg.inv(d.reshape(3,3)).flatten() + s = np.append(Spacing, norm) + + image.SetDirection(d) + image.SetOrigin(p1) + image.SetSpacing(s) + + + # print(image) + # print(d) + # print(s) + + sitk.WriteImage(image, os.path.join(outdir, Series.sanitize_filename(name))) + + + def dump(self): + # for f in self.jpgs: + # print(f) + # print(len(self.jpgs)) + for f in self.txts: + print(f) + print(len(self.jpgs), len(self.txts)) + + +class Study: + def __init__(self, indir): + self.root = indir + self.series = {} + + for root, dirs, files in os.walk(self.root): + for name in files: + series_num = int(name.split('_')[3]) + if series_num not in self.series: + self.series[series_num] = Series(root) + if name.endswith('.jpg'): + self.series[series_num].add_jpg(root, name) + if name.endswith('.txt'): + self.series[series_num].add_txt(root, name) + # series[series_num].txts.append(os.path.join(root, name)) + + def write(self, outdir): + os.makedirs(outdir, exist_ok=True) + for k in sorted(self.series.keys()): + # if k != 3: + # continue + self.series[k].write(outdir) + + +# "Protocol Name" + + + +def main(): + + # s = Study(r'T:\0\6166521\20250513_MR_2316') + # s = Study(r'T:\0\6166521\20220330_MR_1690') + # s = Study(r'T:\0\6166521\20210625_MR_693') + # s = Study(r'T:\0\6166521\20231117_MR_1983') + # s.write('0/') + # exit() + + for e in os.scandir(r'T:\0\6166521'): + if e.is_dir(): + indir = e.path + outdir = os.path.join('0/', e.name) + print(indir) + # continue + s = Study(indir) + s.write(outdir) + + +if __name__ == '__main__': + main()