"""basic utilies to deal with files""" import hashlib import base64 import gzip from datetime import datetime import os import tempfile import filecmp # import re import errno import json # import shotiser import sys import lxml.etree as etree from shutil import copyfile TF = '%Y-%m-%dT%H:%M:%SZ' def prepare(filename): dirname = os.path.dirname(filename) if dirname == '': return if not os.path.exists(dirname): try: os.makedirs(os.path.dirname(filename)) except OSError as exc: # Guard against race condition if exc.errno != errno.EEXIST: raise def install_xml(ele, fufi): temp_file_name = tempfile.NamedTemporaryFile(delete=False).name et = etree.ElementTree(ele) et.write(temp_file_name, pretty_print=True) prepare(fufi) if(os.path.exists(fufi)): if(filecmp.cmp(temp_file_name, fufi, shallow=True)): os.remove(temp_file_name) return copyfile(temp_file_name, fufi) os.remove(temp_file_name) def load(fufi, only_string=False): #if not os.isfile(fufi): # return None if(fufi[-3:] != '.gz'): with open(fufi) as the_file: data = json.load(the_file) return data with gzip.GzipFile(fufi, 'r') as json_file: json_bytes = json_file.read() json_string = json_bytes.decode('utf-8') if only_string: return json_string data = json.loads(json_string) return data def add(fufi, data): """load a set of keys from fufi in into data""" if not isinstance(data, dict): raise Exception("filing must add to a dict") added = load(fufi) for key in added: data[key] = added[key] return data def dump(a1 , a2, no_rewrite=False): if(isinstance(a1, str)): fufi = a1 data = a2 else: fufi = a2 data = a1 # # hopefully not needed prepare(fufi) dump_string = json.dumps(data, ensure_ascii=False, indent=1) if no_rewrite and os.path.isfile(fufi): old_string = load(fufi, only_string=True) if dump_string == old_string: return fufi dump_bytes = dump_string.encode('utf-8') if(fufi[-3:] != '.gz'): with open(fufi, 'w') as the_file: json.dump(data, the_file, indent=1) return fufi with gzip.GzipFile(fufi, 'w') as the_file: the_file.write(dump_bytes) del dump_bytes return fufi def mtime(fufi): mtime = os.stat(fufi).st_mtime out = datetime.fromtimestamp(mtime).strftime(TF) return out def sread(fufi): if(not os.path.isfile(fufi)): raise Exception(fufi + ' is not there.') if(fufi[-3:] != '.gz'): with open(fufi, "r") as the_file: string = the_file.read() else: with gzip.GzipFile(fufi, 'r') as the_file: string = the_file.read() string = string.decode() the_file.close() return string def srite(fufi, string, do_backup=False, do_verbose=False, do_change_check=True, do_preserve_time=None): if (do_preserve_time is True): time = os.path.getmtime(fufi) elif isinstance(do_preserve_time, float): time = do_preserve_time elif isinstance(do_preserve_time, int): time = do_preserve_time # # if exists, check for changes before writing if do_change_check and os.path.isfile(fufi): old_string = sread(fufi) if(old_string == string): if(do_verbose): print("filing: I keep " + fufi) return False the_file = open(fufi, 'w') if(fufi[-3:] != '.gz'): with open(fufi, 'w') as the_file: the_file.write(string) else: with gzip.open(fufi, 'wb') as the_file: the_file.write(string.encode()) the_file.close() if do_verbose: print("filing: I wrote " + fufi) if not os.path.isfile(fufi): raise Exception(f"{fufi} should have been written.") if do_preserve_time is not None: os.utime(fufi, (time, time)) return True def donere(out_fufi, in_fufis, do_verbose=False, do_allow_empty=False): """does need renewal""" if not os.path.isfile(out_fufi): if do_verbose: print("filing.donore does not see " + out_fufi) return True out_info = os.stat(out_fufi) out_size = out_info.st_size if out_size == 0 and not do_allow_empty: if do_verbose: print(f'{out_fufi} is empty') return True out_mtime = out_info.st_mtime if not isinstance(in_fufis, list): raise Exception('filing.donere needs a list of in_fufis') for in_fufi in in_fufis: if not os.path.isfile(in_fufi): print("donere does not see the in_fufi " + in_fufi, file=sys.stderr) continue in_info = os.stat(in_fufi) in_mtime = in_info.st_mtime if in_mtime > out_mtime: if do_verbose: print(f'{out_fufi} older than {in_fufi}') return True if do_verbose: print("filing.donere skips " + out_fufi) return False def parse_lax(fufi, parser=None): if parser is None: parser = etree.XMLParser(remove_blank_text=True) try: ## etree parses gzip transparently doc = etree.parse(fufi, parser) except OSError: print(f"filing can not open {fufi}", file=sys.stderr) return None except etree.XMLSyntaxError: print(fufi + " is not well formed", file=sys.stderr) return None return doc def md5(fufi): the_file = open(fufi, 'br') octets = the_file.read() the_file.close() out = base64.urlsafe_b64encode(hashlib.md5(octets).digest()).decode() out = out[:-2] return out def digest(string): octets = string.encode() out = base64.urlsafe_b64encode(hashlib.md5(octets).digest()).decode() out = out[:-2] return out def write_verfi(fufi, string, refdi, dig=None): if not fufi.startswith(refdi): raise Exception(f"{fufi} must start with {refdi}") if dig is None: dig = digest(string) ext = get_ext(fufi) if ext is None: raise Exception(f"filing has no extension for 'fufi'") len_ext = len(ext) target = fufi[:-len_ext] + '_' + dig + ext len_refdi = len(refdi) + 1 if not os.path.isfile(target): prepare(target) srite(target, string) link = refdi + '/' + dig arm = target[len_refdi:] if os.path.islink(link): return target os.symlink(arm, dig, dir_fd=os.open(refdi, os.O_DIRECTORY)) return target def get_ext(fina): bana = fina if fina.endswith('.gz'): bana = fina[:-3] exts = ('.xslt.xml', '.amf.xml', '.xml', '.json', '.txt') for ext in exts: if not bana.endswith(ext): continue if bana == fina: return ext return ext + '.gz' return None