import copy import os import sys import lxml.etree as etree import subprocess import string from datetime import datetime import docing import filing from lxml.builder import ElementMaker from nitpo import Nitpo from surks import Surks from xpaths import Xpaths class Profile(Nitpo): def __init__(self, do_verbose=False): """subber profiles""" super().__init__() ns = self.const['ns'] self.nsmap = {None: ns} self.N = "{%s}" % self.const['ns'] self.E = ElementMaker(nsmap=self.const['nsmap']) self.check_conf('folders', 'profile') self.to_print = '' self.s = Surks() self.xpaths = Xpaths() self.event = {} self.do_verbose = do_verbose def relfi_from_emad(self, emad): emad = emad.lower() if '@' not in emad: print(f"profile: {emad} is not an emad.") return None parts = emad.split('@') host = parts[1] user = parts[0] parts = host.split('.') parts.reverse() folder = '' for part in parts: folder += part + '/' # for p.o'brien --> p.o’brien if "'" in user: user = user.replace("'", "’") relfi = folder + user + '.xml' return relfi def emad_from_relfi(self, relfi): if relfi.startswith('./'): relfi = relfi[2:] if relfi.endswith('.gz'): relfi = relfi[:-3] if relfi.endswith('.xml'): relfi = relfi[:-4] else: return None relfi = relfi.replace("’", "'") parts = relfi.split('/') parts.reverse() user = parts[0] count = 1 domain = '' while count < len(parts): domain = domain + parts[count] + '.' count = count + 1 domain = domain[:-1] emad = user + '@' + domain return emad def base_from_fufi(self, fufi): base = fufi.replace(self.conf['folders']['profile'] + '/', '') if base.endswith('.xml'): base = base[:-4] return base def fufi_from_emad(self, emad): relfi = self.relfi_from_emad(emad) if relfi is None: return None fufi = self.conf['folders']['profile'] + '/' + relfi zufi = fufi + '.gz' if os.path.isfile(zufi): return zufi return fufi def has(self, emad): fufi = self.fufi_from_emad(emad) if os.path.isfile(fufi): if self.do_verbose: print(f"profile: sees {emad}") return 1 if os.path.isfile(fufi + '.gz'): if self.do_verbose: print(f"gone {emad}") return -1 if not os.path.isfile(fufi): if self.do_verbose: print(f"profile: no subber {emad}") return 0 def show(self, emad): doc = self.load(emad, check_there=True) if doc is None: return print(docing.show(doc)) def emad_from_fufi(self, fufi): parser = etree.XMLParser(remove_blank_text=True) doc = etree.parse(fufi, parser) emad = self.emad_from_doc(doc) return emad def fufi_from_doc(self, doc): emad = self.emad_from_doc(doc) fufi = self.fufi_from_emad(emad) return fufi def is_dead(self, doc): """is the profile dead""" # xp = '/n:profile/n:live/n:surk' xp = '//n:profile/n:live/n:surk' live_count = len(doc.xpath(xp, namespaces=self.const['nsmap'])) if live_count == 0: return True # bounced email xp = '/n:profile/@bounced' bounced = doc.xpath(xp, namespaces=self.const['nsmap']) if len(bounced) > 0: return True return False def is_bounced(self, doc): """is the profile dead""" # xp = '/n:profile/n:live/n:surk' xp = '/n:profile/@bounced' result = self.xpaths.none_or_one(doc, xp) return result def write(self, doc, emad=None): if emad is None: emad = self.emad_from_doc(doc) # # wrong output file, to delete fufi_to_delete = None is_dead = self.is_dead(doc) out_fufi = self.fufi_from_emad(emad) filing.prepare(out_fufi) if is_dead: if not out_fufi.endswith('.gz'): fufi_to_delete = out_fufi out_fufi += '.gz' else: if(out_fufi).endswith('.gz'): fufi_to_delete = out_fufi out_fufi = out_fufi[0:-3] etree.cleanup_namespaces(doc) etree.register_namespace("n", self.const['ns']) out = etree.tostring(doc, pretty_print=True).decode() has_it_been_written = filing.srite(out_fufi, out, do_verbose=self.do_verbose) if fufi_to_delete is not None: if os.path.isfile(fufi_to_delete): print(f"profile removes {fufi_to_delete}") os.remove(fufi_to_delete) return has_it_been_written def load(self, emad, check_there=False): fufi = self.fufi_from_emad(emad) if fufi is None: print(f"profile sees no fufi for {emad}") return None if not os.path.isfile(fufi): if check_there: return None return self.skel(emad) parser = etree.XMLParser(remove_blank_text=True) return etree.parse(fufi, parser) def read(self, fufi, check_there=False): """same as load, but with a fufi""" if not os.path.isfile(fufi): raise Exception(f"no such file: {fufi}") parser = etree.XMLParser(remove_blank_text=True) return etree.parse(fufi, parser) def skel(self, emad): profile_ele = self.E(self.N + 'profile', self.E(self.N + 'live'), self.E(self.N + 'dead'), emad=emad) doc = etree.ElementTree(profile_ele) return doc def delete_report(self, emad, repcode, dont_write=False, doc=None): """remove a report because it is closed""" if doc is None: doc = self.load(emad) # # get the live repcode surk live_xp = '/n:profile/n:live/n:surk[@repcode="' + repcode + '"]' live_surk_eles = doc.xpath(live_xp, namespaces=self.const['nsmap']) if len(live_surk_eles) > 1: err = "I can't have more than 1 live surk per repcode." raise Exception(err) if len(live_surk_eles) == 0: return doc # return None live_surk_ele = live_surk_eles[0] surk_copy = copy.deepcopy(live_surk_ele) surk_copy.attrib['until'] = datetime.today().strftime(self.const['tf']) ## remove the kisu if 'id' in surk_copy.attrib: surk_copy.attrib.pop('id', None) # surk_copy.attrib['id'] = None live_surk_ele.getparent().remove(live_surk_ele) # # append to the dead surks xp = '/n:profile/n:dead' dead_eles = doc.xpath(xp, namespaces=self.const['nsmap']) if len(dead_eles) == 0: # # create dead element xp = '/n:profile' profile_ele = doc.xpath(xp, namespaces=self.const['nsmap'])[0] dead_ele = self.E(self.N + 'dead') profile_ele.append(dead_ele) else: dead_ele = dead_eles[0] dead_ele.append(surk_copy) if not dont_write: # has_it_been_written = self.write(doc) self.write(doc) return doc def register(self, event, do_write=True): self.surks = Surks() self.surks.event = event if 'repcode' not in event: print("profile sees an event without repcode" + str(event), file=sys.stderr) return None emad = event['emad'] doc = self.load(emad) self.to_print = etree.tostring(doc, pretty_print=True).decode() self.to_print += str(event) + '\n' if event['act'] == 'add': doc = self.surks.register_add(doc, event) if doc is None: return None if event['act'] == 'del': doc = self.surks.register_del(doc, event) if doc is None: return None self.to_print += etree.tostring(doc, pretty_print=True).decode() has_it_been_written = True if do_write: has_it_been_written = self.write(doc) if has_it_been_written: print(self.to_print) self.to_print = '' def sub_fields(self, surk_ele, field_name): """values such as name and hopa, that are associated with a surk""" if field_name not in self.event: return surk_ele xp = './n:' + field_name field_eles = surk_ele.xpath(xp, namespaces=self.const['nsmap']) if len(field_eles) > 1: raise Exception("No more that one subfield {field_name}") if len(field_eles) == 0: field_ele = etree.SubElement(surk_ele, self.N + field_name) else: field_ele = field_eles[0] field_ele.text = self.event[field_name] return surk_ele def emad_from_doc(self, doc): xp = '@emad' emad = doc.xpath(xp, namespaces=self.const['nsmap'])[0] return emad def last_death_time(self, doc, repcode): xp = '/n:profile/n:dead/n:surk[@repcode="' + repcode + '"]' surk_eles = doc.xpath(xp, namespaces=self.const['nsmap']) if len(surk_eles) == 0: return None # # assumed to be in historical order last_dead_surk_ele = surk_eles[-1] last_until = last_dead_surk_ele.attrib['until'] return last_until def repcodes(self, doc): repcodes = {} xp = '/n:profile/n:live/n:surk' surk_eles = doc.xpath(xp, namespaces=self.const['nsmap']) for surk_ele in surk_eles: if 'repcode' not in surk_ele.attrib: continue repcode = surk_ele.attrib['repcode'] # use a dict for fast lookup repcodes[repcode] = 1 return repcodes def bounced(self, emad, from_fufi=None): from kapro import Kapro kapro = Kapro() if from_fufi is not None: parser = etree.XMLParser(remove_blank_text=True) doc = etree.parse(from_fufi, parser) else: doc = self.load(emad, check_there=True) if doc is None: print(f'profile has no file for "{emad}" to bounce.') return None profile_ele = doc.getroot() now = datetime.today().strftime(self.const['tf']) profile_ele.attrib['bounced'] = now for repcode in self.repcodes(doc): kapro.delete(repcode, emad) self.write(doc, emad=emad) def move_emad(self, from_emad, to_emad, add_ele=None): from_fufi = self.fufi_from_emad(from_emad) if not os.path.isfile(from_fufi): print(f"profile can't move from {from_emad}") return None doc = self.load(from_emad) # # if the target already exist, let's quit to_fufi = self.fufi_from_emad(to_emad) if os.path.isfile(to_fufi): print(f"profile sees {to_fufi}, end") return None # # change the kapro from kapro import Kapro kapro = Kapro() for repcode in self.repcodes(doc): kapro.delete(repcode, from_emad) doc.getroot().attrib['emad'] = to_emad self.s.emad = to_emad self.s.add_kisus(doc) if add_ele is not None: doc.getroot().append(add_ele) if self.do_verbose: print(docing.show(doc)) self.write(doc) from_fufi = self.fufi_from_emad(from_emad) if os.path.isfile(from_fufi): os.remove(from_fufi) return doc def sign_on(self, emad, reports, spro='v', sint='s', is_test=False, only_virgin=True): fufi = self.fufi_from_emad(emad) if not os.path.isfile(fufi): doc = self.skel(emad) else: doc = self.load(emad) for repcode in reports: doc = self.s.add(doc, repcode, spro=spro, sint=sint, is_test=is_test, only_virgin=only_virgin) self.s.emad = emad self.s.doc = doc self.s.add_kisus(doc) if self.do_verbose: print(docing.show(doc)) self.write(doc, emad=emad) def live_surks(self, doc): xp = '/n:profile/n:live/n:surk' outs = doc.xpath(xp, namespaces=self.const['nsmap']) return outs def live_surk(self, doc, repcode): """fixme: done in a hurry""" xp = '/n:profile/n:live/n:surk[@repcode=' + repcode + ']' outs = doc.xpath(xp, namespaces=self.const['nsmap']) if len(outs) == 0: return None return outs[0] def inject(self, emad, in_ele): """inject document into profile, used by nixer""" doc = self.load(emad) profile_ele = doc.getroot() profile_ele.append(in_ele) print(str(profile_ele))