import copy import os import sys import base64 import hashlib import lxml.etree as etree from datetime import datetime from lxml.builder import ElementMaker import docing from nitpo import Nitpo from xpaths import Xpaths class Surks(Nitpo): def __init__(self, do_verbose=False): """subscriptions""" super().__init__() self.N = "{%s}" % self.const['ns'] self.E = ElementMaker(nsmap={None: self.const['ns']}) self.urdate = None self.emad = None self.event = None self.xpaths = Xpaths() self.ele = None self.do_verbose = do_verbose def new(self, event): surk_ele = etree.Element('surk', nsmap=self.const['nsmap']) surk_ele.attrib['repcode'] = event['repcode'] surk_ele.attrib['spro'] = 'v' surk_ele.attrib['from'] = event['time'] if 'name' in event: name_ele = etree.SubElement(surk_ele, 'name', nsmap=self.const['nsmap']) name_ele.text = event['name'] return surk_ele def has_it_live(self, doc, repcode): xp = "//n:live/n:surk[@repcode='" + repcode + "']" found = self.xpaths.none_or_one(doc, xp) if found is None: return False return True def has_it_any_live(self, doc): xp = "//n:live/n:surk" return self.xpaths.has_it(doc, xp) def has_it_dead(self, doc, repcode): xp = "//n:dead/n:surk[@repcode='" + repcode + "']" return self.xpaths.has_it(doc, xp) def death_datim(self, doc, repcode): """uses the doc""" xp = "//n:dead/n:surk[@repcode='" + repcode + "']" found = self.xpaths.one_or_last(doc, xp) if 'until' not in found.attrib: err = docing.show(doc) print(f"surks: no until for {repcode} in {err}", file=sys.stderr) return None return found.attrib['until'] def birth_datim(self, doc, repcode): xp = "//n:live/n:surk[@repcode='" + repcode + "']" found = self.xpaths.none_or_one(doc, xp) if found is None: return None if 'from' not in found.attrib: err = docing.show(doc) print(f"surks: no from for {repcode} in {err}", file=sys.stderr) return None return found.attrib['from'] def live_ele(self, doc): xp = "/n:profile/n:live" # got = doc.xpath(xp, namespaces=self.const['nsmap']) got = self.xpaths.none_or_one(doc, xp) if len(got) == 0: return None return got[0] def death_time(self, doc, repcode): """last time a death occured, not sure if needed""" xp = "//n:dead/n:surk[@repcode='" + repcode + "']" got = doc.xpath(xp, namespaces=self.const['nsmap']) if len(got) == 0: return None death_time = '' for surk_ele in got: if 'until' not in surk_ele.attrib: continue until = surk_ele.attrib['until'] if until > death_time: death_time = until return death_time def is_test(self, doc, repcode): """last time a death occured, not sure if needed""" xp = "//n:live/n:surk[@repcode='" + repcode + "']" got = self.xpaths.none_or_one(doc, xp) if got is None: return False if 'test' in got.attrib: return True return False def name_ele(self, doc, repcode): xp = "//n:live/n:surk[@repcode='" + repcode + "']/n:name" got = doc.xpath(xp, namespaces=self.const['nsmap']) if len(got) == 0: return False if len(got) > 1: raise Exception("I can't have several live subscriptions for " + repcode) return got[0] def test_kisu(self, doc, repcode): """uses the urdate to keep the kisu the same over time""" urdate = self.get_urdate(doc) out = self.calc_kisu(self.emad, urdate, repcode) return out def kisu(self, doc, repcode): """from the document""" if self.is_test(doc, repcode): # print("test") out = self.test_kisu(doc, repcode) return out birth = self.birth_datim(doc, repcode) out = self.calc_kisu(self.emad, birth, repcode) return out def calc_kisu(self, in1, in2, in3): string = in1 + in2 + in3 string = string.encode() out = base64.urlsafe_b64encode(hashlib.md5(string).digest()).decode() out = out[:-2] return out def live_surks_check(self, doc): xp = '/n:profile/n:live/n:surk' live_surk_eles = self.xpaths.run(doc, xp) by_repcode = {} for live_surk_ele in live_surk_eles: if 'repcode' not in live_surk_ele.attrib: print(f"surks: no repcode on surk", file=sys.stderr) repcode = live_surk_ele.attrib['repcode'] if repcode in by_repcode: out = docing.show(doc) print(f"duplicate live in\n{doc}") quit() by_repcode[repcode] = 1 def register_del(self, doc, event): import dating self.event = event repcode = event['repcode'] # # get the live repcode surk live_xp = '/n:profile/n:live/n:surk[@repcode="' + repcode + '"]' live_surk_eles = doc.xpath(live_xp, namespaces=self.const['nsmap']) if len(live_surk_eles) > 1: err = "I can't have more than 1 live surk per repcode." raise Exception(err) if len(live_surk_eles) == 1: live_surk_ele = live_surk_eles[0] # # normal case # if live_surk_ele.attrib['from'] < event['time']: if dating.is_it_a_close_date(live_surk_ele.attrib['from'], event['time']): live_surk_ele.attrib['until'] = event['time'] surk_copy = copy.deepcopy(live_surk_ele) if 'via' in event: self.sub_fields(surk_copy, 'via') # # remove from live surks live_surk_ele.getparent().remove(live_surk_ele) # # append to the dead surks xp = '/n:profile/n:dead' dead_eles = doc.xpath(xp, namespaces=self.const['nsmap']) if len(dead_eles) == 0: # # create dead element xp = '/n:profile' profile_ele = doc.xpath(xp, namespaces=self.const['nsmap'])[0] dead_ele = self.E(self.N + 'dead') profile_ele.append(dead_ele) else: dead_ele = dead_eles[0] dead_ele.append(surk_copy) return doc else: print("profile can't remove a surk from before it started", file=sys.stderr) return None # # make some fixes for historic data dead_xp = '/n:profile/n:dead/n:surk[@repcode="' + repcode + '"]' dead_surk_eles = doc.xpath(dead_xp, namespaces=self.const['nsmap']) if len(dead_surk_eles) == 0: print('dead_xp ' + dead_xp) print("profile want to change a death date but there is no death.") # file=sys.stderr) return None dead_surk_eles.reverse() # # starting with the last for dead_surk_ele in dead_surk_eles: # # in log records, we have events that are earlier than the time # # in the historic data if dead_surk_ele.attrib['until'] == event['time']: return None if dating.is_it_a_close_date(dead_surk_ele.attrib['until'], event['time']): dead_surk_ele.attrib['until'] = event['time'] self.sub_fields(dead_surk_ele, 'via') return doc # # nothing got done err = "nothing done for " + str(event) return None def register_add(self, doc, event, spro='v'): """used for mailman converter""" import dating self.event = event repcode = event['repcode'] xp = '/n:profile/n:live/n:surk[@repcode="' + repcode + '"]' surk_eles = doc.xpath(xp, namespaces=self.const['nsmap']) if len(surk_eles) > 1: err = "I can't have more than 1 live surk per repcode." raise Exception(err) if len(surk_eles) == 1: do_append = False surk_ele = surk_eles[0] else: do_append = True surk_ele = etree.Element(self.N + 'surk') surk_ele.attrib['repcode'] = event['repcode'] if 'spro' in event: spro = event['spro'] surk_ele.attrib['spro'] = spro # # set time on the surk, if it's a close only # if 'from' not in surk_ele.attrib or surk_ele.attrib['from'] > event['time']: if 'from' not in surk_ele.attrib or \ dating.is_it_a_close_date(surk_ele.attrib['from'], event['time']): surk_ele.attrib['from'] = event['time'] for sub_field in ('name', 'hopa', 'via'): self.sub_fields(surk_ele, sub_field) xp = 'n:live' live_eles = doc.xpath(xp, namespaces={'n': self.const['ns']}) if len(live_eles) == 0: # # create live element xp = '/n:profile' profile_ele = doc.xpath(xp, namespaces=self.const['nsmap'])[0] live_ele = self.E(self.N + 'live') profile_ele.append(live_ele) else: live_ele = live_eles[0] if do_append: live_ele.append(surk_ele) surk_ele = self.new(event) #live_ele.append(surk_ele) return doc def sub_fields(self, surk_ele, field_name): """values such as name and hopa, that are associated with a surk""" if field_name not in self.event: return surk_ele xp = './n:' + field_name field_eles = surk_ele.xpath(xp, namespaces=self.const['nsmap']) if len(field_eles) > 1: raise Exception("No more that one subfield {field_name}") if len(field_eles) == 0: field_ele = etree.SubElement(surk_ele, self.N + field_name) else: field_ele = field_eles[0] field_ele.text = self.event[field_name] return surk_ele def add_kisus(self, doc): xp = '/n:profile/n:live/n:surk' self.emad = self.get_emad(doc) for surk_ele in doc.xpath(xp, namespaces=self.const['nsmap']): # no kisu for admin if 'spro' in surk_ele.attrib: if surk_ele.attrib['spro'] == 'a': continue if 'test' in surk_ele.attrib: self.urdate = self.get_urdate(doc) repcode = surk_ele.attrib['repcode'] kisu = self.kisu(doc, repcode) old_kisu = None if 'id' in surk_ele.attrib: old_kisu = surk_ele.attrib['id'] surk_ele.attrib['id'] = kisu # print(f"surks changes the kisu for {repcode}") if 'test' in surk_ele.attrib and old_kisu is not None: if surk_ele.attrib['id'] != old_kisu: if(surk_ele.attrib['from'] != self.urdate): raise Exception('kisu should be static for tests') return doc def add(self, doc, repcode, spro='v', sint='s', is_test=False, only_virgin=False): xp = '/n:profile/n:live/n:surk[@repcode="' + repcode + '"]' surk_ele = self.xpaths.none_or_one(doc, xp) if surk_ele is not None: return doc xp = '/n:profile/n:live' live_ele = self.xpaths.none_or_one(doc, xp) if only_virgin and self.has_it_dead(doc, repcode): print(f"surks: {repcode} is not vigin.") return doc if live_ele is None: profile_ele = self.xpaths.one(doc, '/n:profile') live_ele = etree.SubElement(profile_ele, self.N + 'live') live_ele = self.xpaths.one(doc, xp) surk_ele = etree.SubElement(live_ele, self.N + 'surk') surk_ele.attrib['repcode'] = repcode surk_ele.attrib['from'] = self.now() if is_test is True: surk_ele.attrib['test'] = 'y' if spro == 'a': surk_ele.attrib['spro'] = 'a' if sint == 's': surk_ele.attrib['sint'] = 's' if sint == 'd': surk_ele.attrib['sint'] = 'd' if spro == 'v': surk_ele.attrib['spro'] = 'v' # doc = self.add_kisus(doc) return doc def sint(self, surk_ele): if 'sint' in surk_ele.attrib: return surk_ele.attrib['sint'] if 'spro' not in surk_ele.attrib: return 's' if surk_ele.attrib['spro'] == 'a': return 'd' # just return the default return 's' def spro(self, surk_ele): if 'spro' in surk_ele.attrib: return surk_ele.attrib['spro'] if 'sint' not in surk_ele.attrib: return 'v' if surk_ele.attrib['sint'] == 'd': return 'a' # just return the default return 'v' def set_ele(self, doc, repcode, emad): ## this is wasteful xpb = "/n:profile/n:live/n:surk[@repcode='" + repcode + "']" ## in document data xp = "/*" + xpb eles = doc.xpath(xp, namespaces=self.const['nsmap']) if len(eles) == 1: self.ele = eles[0] return self.ele if len(eles) > 1: print(f"surks sees multiples {repcode}", file=sys.stderr) xp = xpb eles = doc.xpath(xp, namespaces=self.const['nsmap']) if len(eles) == 1: self.ele = eles[0] return self.ele if len(eles) > 1: print(f"surks sees multiples {repcode}", file=sys.stderr) xp = "//n:surk[@repcode='" + repcode + "']" # omnibus case xp = "//n:surk[@emad='" + emad + "']" eles = doc.xpath(xp, namespaces=self.const['nsmap']) if len(eles) > 1: print(f"surks sees multiples {emad}", file=sys.stderr) if len(eles) == 1: self.ele = eles[0] return self.ele print(f"surks sees no surk for {repcode} or {emad} ", file=sys.stderr) if self.is_dev(): quit() def get_emad(self, surk_ele): xp = '@emad' emads = surk_ele.xpath(xp, namespaces=self.const['nsmap']) if len(emads) > 0: return emads[0] print_surk_ele = etree.tostring(surk_ele, pretty_print=True).decode() print_err = "surks: no emad in \n" + print_surk_ele print(print_err, file=sys.stderr) return None def get_urdate(self, doc): """first date among all surks, to keep kisus for test surks""" xp = '/n:profile/*/n:surk/@from' froms = doc.xpath(xp, namespaces=self.const['nsmap']) if len(froms) == 0: show = docing.show(doc) print(f"no urdate in \n\n{show}", file=sys.stderr) return None # bounced email urdate = datetime.today().strftime(self.const['tf']) for date in froms: if date < urdate: urdate = date return urdate def get_fidate(self, doc): """last date among all surks and bounced""" dates = [] xp = '/n:profile/@bounced' bounced = doc.xpath(xp, namespaces=self.const['nsmap']) if len(bounced) > 0: dates.append(bounced[0]) for bound in ('from', 'until'): xp = '/n:profile/*/n:surk/@' + bound bounds = doc.xpath(xp, namespaces=self.const['nsmap']) for bound in bounds: dates.append(bound) ## for web element for bound in ('from', 'until'): xp = '/n:web/n:profile[1]/*/n:surk/@' + bound bounds = doc.xpath(xp, namespaces=self.const['nsmap']) for bound in bounds: dates.append(bound) if len(dates) == 0: show = docing.show(doc) print(f"no date in \n\n{show}", file=sys.stderr) return None fidate = dates[0] for date in dates: if date > fidate: fidate = date return fidate