前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python 辅助工具类 邮件收发 常用

python 辅助工具类 邮件收发 常用

作者头像
用户5760343
发布2022-05-13 09:41:09
6060
发布2022-05-13 09:41:09
举报
文章被收录于专栏:sktj

-----------------------------mailconfig.py popservername = 'pop.secureserver.net' popusername = 'PP4E@learning-python.com' smtpservername = 'smtpout.secureserver.net' myaddress = 'PP4E@learning-python.com' mysignature = ('Thanks,\n' '--Mark Lutz (http://learning-python.com, http://rmi.net/~lutz)') smtpuser = None # per your ISP smtppasswdfile = '' # set to '' to be asked poppasswdfile = r'c:\temp\pymailgui.txt' # set to '' to be asked sentmailfile = r'.\sentmail.txt' # . means in current working dir savemailfile = r'c:\temp\savemail.txt' # not used in PyMailGUI: dialog fetchEncoding = 'utf8' # 4E: how to decode and store message text (or latin1?) headersEncodeTo = None # 4E: how to encode non-ASCII headers sent (None=utf8) fetchlimit = 25 # 4E: maximum number headers/emails to fetch on loads ------------------------------------------------mailTool.py class MailTool: # superclass for all mail tools def trace(self, message): # redef me to disable or log to file print(message)

class SilentMailTool: # to mixin instead of subclassing def trace(self, message): pass ----------------------------------------mailSender.py

import mailconfig # client's mailconfig import smtplib, os, mimetypes # mime: name to type import email.utils, email.encoders # date string, base64 from .mailTool import MailTool, SilentMailTool # 4E: package-relative

from email.message import Message # general message, obj->text from email.mime.multipart import MIMEMultipart # type-specific messages from email.mime.audio import MIMEAudio # format/encode attachments from email.mime.image import MIMEImage from email.mime.text import MIMEText from email.mime.base import MIMEBase from email.mime.application import MIMEApplication # 4E: use new app class

def fix_encode_base64(msgobj): linelen = 76 # per MIME standards from email.encoders import encode_base64

代码语言:javascript
复制
encode_base64(msgobj)                # what email does normally: leaves bytes
text = msgobj.get_payload()          # bytes fails in email pkg on text gen
if isinstance(text, bytes):          # payload is bytes in 3.1, str in 3.2 alpha
    text = text.decode('ascii')      # decode to unicode str so text gen works

lines = []                           # split into lines, else 1 massive line
text  = text.replace('\n', '')       # no \n present in 3.1, but futureproof me!
while text:
    line, text = text[:linelen], text[linelen:]
    lines.append(line)
msgobj.set_payload('\n'.join(lines))

def fix_text_required(encodingname): from email.charset import Charset, BASE64, QP

代码语言:javascript
复制
charset = Charset(encodingname)   # how email knows what to do for encoding
bodyenc = charset.body_encoding   # utf8, others require bytes input data
return bodyenc in (None, QP)      # ascii, latin1, others require str

class MailSender(MailTool): def init(self, smtpserver=None, tracesize=256): self.smtpServerName = smtpserver or mailconfig.smtpservername self.tracesize = tracesize

代码语言:javascript
复制
def sendMessage(self, From, To, Subj, extrahdrs, bodytext, attaches,
                                  saveMailSeparator=(('=' * 80) + 'PY\n'),
                                  bodytextEncoding='us-ascii',
                                  attachesEncodings=None):
    if fix_text_required(bodytextEncoding): 
        if not isinstance(bodytext, str):
            bodytext = bodytext.decode(bodytextEncoding)
    else:
        if not isinstance(bodytext, bytes):
            bodytext = bodytext.encode(bodytextEncoding)

    # make message root
    if not attaches:
        msg = Message()
        msg.set_payload(bodytext, charset=bodytextEncoding)
    else:
        msg = MIMEMultipart()
        self.addAttachments(msg, bodytext, attaches,
                                 bodytextEncoding, attachesEncodings)
    hdrenc = mailconfig.headersEncodeTo or 'utf-8'        # default=utf8
    Subj = self.encodeHeader(Subj, hdrenc)                # full header
    From = self.encodeAddrHeader(From, hdrenc)            # email names
    To   = [self.encodeAddrHeader(T, hdrenc) for T in To] # each recip
    Tos  = ', '.join(To)                                  # hdr+envelope

    # add headers to root
    msg['From']    = From
    msg['To']      = Tos                        # poss many: addr list
    msg['Subject'] = Subj                       # servers reject ';' sept
    msg['Date']    = email.utils.formatdate()   # curr datetime, rfc2822 utc
    recip = To
    for name, value in extrahdrs:               # Cc, Bcc, X-Mailer, etc.
        if value:
            if name.lower() not in ['cc', 'bcc']:
                value = self.encodeHeader(value, hdrenc)
                msg[name] = value
            else:
                value = [self.encodeAddrHeader(V, hdrenc) for V in value]
                recip += value                     # some servers reject ['']
                if name.lower() != 'bcc':          # 4E: bcc gets mail, no hdr
                    msg[name] = ', '.join(value)   # add commas between cc

    recip = list(set(recip))                       # 4E: remove duplicates
    fullText = msg.as_string()                     # generate formatted msg
    self.trace('Sending to...' + str(recip))
    self.trace(fullText[:self.tracesize])                # SMTP calls connect
    server = smtplib.SMTP(self.smtpServerName, timeout=20)  # this may fail too
    self.getPassword()                                   # if srvr requires
    self.authenticateServer(server)                      # login in subclass
    try:
        failed = server.sendmail(From, recip, fullText)  # except or dict
    except:
        server.close()                                   # 4E: quit may hang!
        raise                                            # reraise except
    else:
        server.quit()                                    # connect + send OK
    self.saveSentMessage(fullText, saveMailSeparator)    # 4E: do this first 
    if failed:
        class SomeAddrsFailed(Exception): pass
        raise SomeAddrsFailed('Failed addrs:%s\n' % failed)
    self.trace('Send exit')

def addAttachments(self, mainmsg, bodytext, attaches,
                                  bodytextEncoding, attachesEncodings):
    # add main text/plain part
    msg = MIMEText(bodytext, _charset=bodytextEncoding)
    mainmsg.attach(msg)

    # add attachment parts
    encodings = attachesEncodings or (['us-ascii'] * len(attaches))
    for (filename, fileencode) in zip(attaches, encodings):
        # filename may be absolute or relative
        if not os.path.isfile(filename):             # skip dirs, etc.
            continue

        # guess content type from file extension, ignore encoding
        contype, encoding = mimetypes.guess_type(filename)
        if contype is None or encoding is not None:  # no guess, compressed?
            contype = 'application/octet-stream'     # use generic default
        self.trace('Adding ' + contype)

        # build sub-Message of appropriate kind
        maintype, subtype = contype.split('/', 1)
        if maintype == 'text':                       # 4E: text needs encoding
            if fix_text_required(fileencode):        # requires str or bytes
                data = open(filename, 'r', encoding=fileencode)
            else:
                data = open(filename, 'rb')
            msg = MIMEText(data.read(), _subtype=subtype, _charset=fileencode)
            data.close()

        elif maintype == 'image':
            data = open(filename, 'rb')              # 4E: use fix for binaries
            msg  = MIMEImage(
                   data.read(), _subtype=subtype, _encoder=fix_encode_base64)
            data.close()

        elif maintype == 'audio':
            data = open(filename, 'rb')
            msg  = MIMEAudio(
                   data.read(), _subtype=subtype, _encoder=fix_encode_base64)
            data.close()

        elif maintype == 'application':              # new  in 4E
            data = open(filename, 'rb')
            msg  = MIMEApplication(
                   data.read(), _subtype=subtype, _encoder=fix_encode_base64)
            data.close()

        else:
            data = open(filename, 'rb')              # application/* could 
            msg  = MIMEBase(maintype, subtype)       # use this code too
            msg.set_payload(data.read())
            data.close()                             # make generic type
            fix_encode_base64(msg)                   # was broken here too!
           #email.encoders.encode_base64(msg)        # encode using base64

        # set filename and attach to container
        basename = os.path.basename(filename)
        msg.add_header('Content-Disposition',
                       'attachment', filename=basename)
        mainmsg.attach(msg)

    # text outside mime structure, seen by non-MIME mail readers
    mainmsg.preamble = 'A multi-part MIME format message.\n'
    mainmsg.epilogue = ''  # make sure message ends with a newline

def saveSentMessage(self, fullText, saveMailSeparator):
    try:
        sentfile = open(mailconfig.sentmailfile, 'a', 
                              encoding=mailconfig.fetchEncoding)    # 4E
        if fullText[-1] != '\n': fullText += '\n'
        sentfile.write(saveMailSeparator)
        sentfile.write(fullText)
        sentfile.close()
    except:
        self.trace('Could not save sent message')    # not a show-stopper

def encodeHeader(self, headertext, unicodeencoding='utf-8'):
    try:
        headertext.encode('ascii')
    except:
        try:
            hdrobj = email.header.make_header([(headertext, unicodeencoding)])
            headertext = hdrobj.encode()
        except:
            pass         # auto splits into multiple cont lines if needed
    return headertext    # smtplib may fail if it won't encode to ascii

def encodeAddrHeader(self, headertext, unicodeencoding='utf-8'):
    try:
        pairs = email.utils.getaddresses([headertext])   # split addrs + parts
        encoded = []
        for name, addr in pairs:
            try:
                name.encode('ascii')         # use as is if okay as ascii
            except UnicodeError:             # else try to encode name part
                try:
                    uni  = name.encode(unicodeencoding) 
                    hdr  = email.header.make_header([(uni, unicodeencoding)])
                    name = hdr.encode()
                except:
                    name = None              # drop name, use address part only
            joined = email.utils.formataddr((name, addr))  # quote name if need
            encoded.append(joined)

        fullhdr = ', '.join(encoded)
        if len(fullhdr) > 72 or '\n' in fullhdr:      # not one short line?
            fullhdr = ',\n '.join(encoded)            # try multiple lines
        return fullhdr
    except:
        return self.encodeHeader(headertext)

def authenticateServer(self, server):
    pass  # no login required for this server/class

def getPassword(self):
    pass  # no login required for this server/class

class MailSenderAuth(MailSender): smtpPassword = None # 4E: on class, not self, shared by poss N instances

代码语言:javascript
复制
def __init__(self, smtpserver=None, smtpuser=None):
    MailSender.__init__(self, smtpserver)
    self.smtpUser = smtpuser or mailconfig.smtpuser
    # self.smtpPassword = None   # 4E: makes pyMailGUI ask for each send!

def authenticateServer(self, server):
    server.login(self.smtpUser, self.smtpPassword)

def getPassword(self):
 
    if not self.smtpPassword:
        try:
            localfile = open(mailconfig.smtppasswdfile)
            MailSenderAuth.smtpPassword = localfile.readline()[:-1]     # 4E
            self.trace('local file password' + repr(self.smtpPassword))
        except:
            MailSenderAuth.smtpPassword = self.askSmtpPassword()        # 4E

def askSmtpPassword(self):
    assert False, 'Subclass must define method'

class MailSenderAuthConsole(MailSenderAuth): def askSmtpPassword(self): import getpass prompt = 'Password for %s on %s?' % (self.smtpUser, self.smtpServerName) return getpass.getpass(prompt)

class SilentMailSender(SilentMailTool, MailSender): pass # replaces trace ----------------------------------------------mailFetcher.py

import poplib, mailconfig, sys # client's mailconfig on sys.path print('user:', mailconfig.popusername) # script dir, pythonpath, changes

from .mailParser import MailParser # for headers matching (4E: .) from .mailTool import MailTool, SilentMailTool # trace control supers (4E: .)

index/server msgnum out of synch tests

class DeleteSynchError(Exception): pass # msg out of synch in del class TopNotSupported(Exception): pass # can't run synch test class MessageSynchError(Exception): pass # index list out of sync

class MailFetcher(MailTool): def init(self, popserver=None, popuser=None, poppswd=None, hastop=True): self.popServer = popserver or mailconfig.popservername self.popUser = popuser or mailconfig.popusername self.srvrHasTop = hastop self.popPassword = poppswd # ask later if None

代码语言:javascript
复制
def connect(self):
    self.trace('Connecting...')
    self.getPassword()                          # file, GUI, or console
    server = poplib.POP3(self.popServer, timeout=20)
    server.user(self.popUser)                   # connect,login POP server
    server.pass_(self.popPassword)              # pass is a reserved word
    self.trace(server.getwelcome())             # print returned greeting
    return server

fetchEncoding = mailconfig.fetchEncoding

def decodeFullText(self, messageBytes):
    text = None
    kinds =  [self.fetchEncoding]             # try user setting first
    kinds += ['ascii', 'latin1', 'utf8']      # then try common types
    kinds += [sys.getdefaultencoding()]       # and platform dflt (may differ)
    for kind in kinds:                        # may cause mail saves to fail
        try:
            text = [line.decode(kind) for line in messageBytes]
            break
        except (UnicodeError, LookupError):   # LookupError: bad name
            pass

    if text == None:
        # try returning headers + error msg, else except may kill client;
        # still try to decode headers per ascii, other, platform default;

        blankline = messageBytes.index(b'')
        hdrsonly  = messageBytes[:blankline]
        commons   = ['ascii', 'latin1', 'utf8']
        for common in commons:
            try:
                text = [line.decode(common) for line in hdrsonly] 
                break
            except UnicodeError:
                pass
        else:                                                  # none worked
            try:
                text = [line.decode() for line in hdrsonly]    # platform dflt?
            except UnicodeError:
                text = ['From: (sender of unknown Unicode format headers)']
        text += ['', '--Sorry: mailtools cannot decode this mail content!--']
    return text

def downloadMessage(self, msgnum):
    self.trace('load ' + str(msgnum))
    server = self.connect()
    try:
        resp, msglines, respsz = server.retr(msgnum)
    finally:
        server.quit()
    msglines = self.decodeFullText(msglines)   # raw bytes to Unicode str
    return '\n'.join(msglines)                 # concat lines for parsing

def downloadAllHeaders(self, progress=None, loadfrom=1):

    if not self.srvrHasTop:                    # not all servers support TOP
        # naively load full msg text
        return self.downloadAllMsgs(progress, loadfrom)  
    else:
        self.trace('loading headers')
        fetchlimit = mailconfig.fetchlimit
        server = self.connect()                # mbox now locked until quit
        try:
            resp, msginfos, respsz = server.list()   # 'num size' lines list
            msgCount = len(msginfos)                 # alt to srvr.stat[0]
            msginfos = msginfos[loadfrom-1:]         # drop already loadeds
            allsizes = [int(x.split()[1]) for x in msginfos]
            allhdrs  = []
            for msgnum in range(loadfrom, msgCount+1):          # poss empty
                if progress: progress(msgnum, msgCount)         # run callback
                if fetchlimit and (msgnum <= msgCount - fetchlimit):
                    # skip, add dummy hdrs
                    hdrtext = 'Subject: --mail skipped--\n\n' 
                    allhdrs.append(hdrtext)
                else:
                    # fetch, retr hdrs only
                    resp, hdrlines, respsz = server.top(msgnum, 0)
                    hdrlines = self.decodeFullText(hdrlines)
                    allhdrs.append('\n'.join(hdrlines))
        finally:
            server.quit()                          # make sure unlock mbox
        assert len(allhdrs) == len(allsizes)
        self.trace('load headers exit')
        return allhdrs, allsizes, False

def downloadAllMessages(self, progress=None, loadfrom=1):
    self.trace('loading full messages')
    fetchlimit = mailconfig.fetchlimit
    server = self.connect()
    try:
        (msgCount, msgBytes) = server.stat()          # inbox on server
        allmsgs  = []
        allsizes = []
        for i in range(loadfrom, msgCount+1):         # empty if low >= high
            if progress: progress(i, msgCount)
            if fetchlimit and (i <= msgCount - fetchlimit):
                # skip, add dummy mail
                mailtext = 'Subject: --mail skipped--\n\nMail skipped.\n' 
                allmsgs.append(mailtext)
                allsizes.append(len(mailtext))
            else:
                # fetch, retr full mail
                (resp, message, respsz) = server.retr(i)  # save text on list
                message = self.decodeFullText(message)
                allmsgs.append('\n'.join(message))        # leave mail on server
                allsizes.append(respsz)                   # diff from len(msg)
    finally:
        server.quit()                                     # unlock the mail box
    assert len(allmsgs) == (msgCount - loadfrom) + 1      # msg nums start at 1
   #assert sum(allsizes) == msgBytes                      # not if loadfrom > 1
    return allmsgs, allsizes, True                        # not if fetchlimit

def deleteMessages(self, msgnums, progress=None):
    self.trace('deleting mails')
    server = self.connect()
    try:
        for (ix, msgnum) in enumerate(msgnums):   # don't reconnect for each
            if progress: progress(ix+1, len(msgnums))
            server.dele(msgnum)
    finally:                                      # changes msgnums: reload
        server.quit()

def deleteMessagesSafely(self, msgnums, synchHeaders, progress=None):
    if not self.srvrHasTop:
        raise TopNotSupported('Safe delete cancelled')

    self.trace('deleting mails safely')
    errmsg  = 'Message %s out of synch with server.\n'
    errmsg += 'Delete terminated at this message.\n'
    errmsg += 'Mail client may require restart or reload.'

    server = self.connect()                       # locks inbox till quit
    try:                                          # don't reconnect for each
        (msgCount, msgBytes) = server.stat()      # inbox size on server
        for (ix, msgnum) in enumerate(msgnums):
            if progress: progress(ix+1, len(msgnums))
            if msgnum > msgCount:                            # msgs deleted
                raise DeleteSynchError(errmsg % msgnum)
            resp, hdrlines, respsz = server.top(msgnum, 0)   # hdrs only
            hdrlines = self.decodeFullText(hdrlines)
            msghdrs = '\n'.join(hdrlines)
            if not self.headersMatch(msghdrs, synchHeaders[msgnum-1]):
                raise DeleteSynchError(errmsg % msgnum)
            else:
                server.dele(msgnum)               # safe to delete this msg
    finally:                                      # changes msgnums: reload
        server.quit()                             # unlock inbox on way out

def checkSynchError(self, synchHeaders):
    self.trace('synch check')
    errormsg  = 'Message index out of synch with mail server.\n'
    errormsg += 'Mail client may require restart or reload.'
    server = self.connect()
    try:
        lastmsgnum = len(synchHeaders)                      # 1..N
        (msgCount, msgBytes) = server.stat()                # inbox size
        if lastmsgnum > msgCount:                           # fewer now?
            raise MessageSynchError(errormsg)               # none to cmp
        if self.srvrHasTop:
            resp, hdrlines, respsz = server.top(lastmsgnum, 0)  # hdrs only
            hdrlines = self.decodeFullText(hdrlines)
            lastmsghdrs = '\n'.join(hdrlines)
            if not self.headersMatch(lastmsghdrs, synchHeaders[-1]):
                raise MessageSynchError(errormsg)
    finally:
        server.quit()

def headersMatch(self, hdrtext1, hdrtext2):
    # try match by simple string compare
    if hdrtext1 == hdrtext2:
        self.trace('Same headers text')
        return True

    # try match without status lines
    split1 = hdrtext1.splitlines()       # s.split('\n'), but no final ''
    split2 = hdrtext2.splitlines()
    strip1 = [line for line in split1 if not line.startswith('Status:')]
    strip2 = [line for line in split2 if not line.startswith('Status:')]
    if strip1 == strip2:
        self.trace('Same without Status')
        return True

    # try mismatch by message-id headers if either has one
    msgid1 = [line for line in split1 if line[:11].lower() == 'message-id:']
    msgid2 = [line for line in split2 if line[:11].lower() == 'message-id:']
    if (msgid1 or msgid2) and (msgid1 != msgid2):
        self.trace('Different Message-Id')
        return False

    # try full hdr parse and common headers if msgid missing or trash
    tryheaders  = ('From', 'To', 'Subject', 'Date')
    tryheaders += ('Cc', 'Return-Path', 'Received')
    msg1 = MailParser().parseHeaders(hdrtext1)
    msg2 = MailParser().parseHeaders(hdrtext2)
    for hdr in tryheaders:                          # poss multiple Received
        if msg1.get_all(hdr) != msg2.get_all(hdr):  # case insens, dflt None
            self.trace('Diff common headers')
            return False

    # all common hdrs match and don't have a diff message-id
    self.trace('Same common headers')
    return True

def getPassword(self):
    if not self.popPassword:
        try:
            localfile = open(mailconfig.poppasswdfile)
            self.popPassword = localfile.readline()[:-1]
            self.trace('local file password' + repr(self.popPassword))
        except:
            self.popPassword = self.askPopPassword()

def askPopPassword(self):
    assert False, 'Subclass must define method'

################################################################################

specialized subclasses

################################################################################

class MailFetcherConsole(MailFetcher): def askPopPassword(self): import getpass prompt = 'Password for %s on %s?' % (self.popUser, self.popServer) return getpass.getpass(prompt)

class SilentMailFetcher(SilentMailTool, MailFetcher): pass # replaces trace ----------------------------------------------mailParser.py """ ############################################################################### parsing and attachment extract, analyse, save (see init for docs, test) ############################################################################### """

import os, mimetypes, sys # mime: map type to name import email.parser # parse text to Message object import email.header # 4E: headers decode/encode import email.utils # 4E: addr header parse/decode from email.message import Message # Message may be traversed from .mailTool import MailTool # 4E: package-relative

class MailParser(MailTool):

代码语言:javascript
复制
def walkNamedParts(self, message):
    for (ix, part) in enumerate(message.walk()):    # walk includes message
        fulltype = part.get_content_type()          # ix includes parts skipped
        maintype = part.get_content_maintype()
        if maintype == 'multipart':                 # multipart/*: container
            continue                                
        elif fulltype == 'message/rfc822':          # 4E: skip message/rfc822
            continue                                # skip all message/* too?
        else:
            filename, contype = self.partName(part, ix)
            yield (filename, contype, part)

def partName(self, part, ix):
    filename = part.get_filename()                # filename in msg hdrs?
    contype  = part.get_content_type()            # lowercase maintype/subtype
    if not filename:
        filename = part.get_param('name')         # try content-type name
    if not filename:
        if contype == 'text/plain':               # hardcode plain text ext
            ext = '.txt'                          # else guesses .ksh!
        else:
            ext = mimetypes.guess_extension(contype)
            if not ext: ext = '.bin'              # use a generic default
        filename = 'part-%03d%s' % (ix, ext)
    return (filename, contype)

def saveParts(self, savedir, message):
    if not os.path.exists(savedir):
        os.mkdir(savedir)
    partfiles = []
    for (filename, contype, part) in self.walkNamedParts(message):
        fullname = os.path.join(savedir, filename)
        fileobj  = open(fullname, 'wb')             # use binary mode
        content  = part.get_payload(decode=1)       # decode base64,qp,uu
        if not isinstance(content, bytes):          # 4E: need bytes for rb
            content = b'(no content)'               # decode=1 returns bytes,
        fileobj.write(content)                      # but some payloads None
        fileobj.close()                             # 4E: not str(content)
        partfiles.append((contype, fullname))       # for caller to open
    return partfiles

def saveOnePart(self, savedir, partname, message):
    """
    ditto, but find and save just one part by name
    """
    if not os.path.exists(savedir):
        os.mkdir(savedir)
    fullname = os.path.join(savedir, partname)
    (contype, content) = self.findOnePart(partname, message)
    if not isinstance(content, bytes):          # 4E: need bytes for rb
        content = b'(no content)'               # decode=1 returns bytes,
    open(fullname, 'wb').write(content)         # but some payloads None
    return (contype, fullname)                  # 4E: not str(content)

def partsList(self, message):
    validParts = self.walkNamedParts(message)
    return [filename for (filename, contype, part) in validParts]

def findOnePart(self, partname, message):
    for (filename, contype, part) in self.walkNamedParts(message):
        if filename == partname:
            content = part.get_payload(decode=1)          # does base64,qp,uu
            return (contype, content)                     # may be bytes text

def decodedPayload(self, part, asStr=True):
    payload = part.get_payload(decode=1)           # payload may be bytes
    if asStr and isinstance(payload, bytes):       # decode=1 returns bytes
        tries = []
        enchdr = part.get_content_charset()        # try msg headers first!
        if enchdr:
            tries += [enchdr]                      # try headers first   
        tries += [sys.getdefaultencoding()]        # same as bytes.decode()
        tries += ['latin1', 'utf8']                # try 8-bit, incl ascii
        for trie in tries:                         # try utf8 (windows dflt)
            try:
                payload = payload.decode(trie)     # give it a shot, eh?
                break
            except (UnicodeError, LookupError):    # lookuperr: bad name
                pass
        else:
            payload = '--Sorry: cannot decode Unicode text--' 
    return payload

def findMainText(self, message, asStr=True):

    # try to find a plain text
    for part in message.walk():                            # walk visits message
        type = part.get_content_type()                     # if nonmultipart
        if type == 'text/plain':                           # may be base64,qp,uu
            return type, self.decodedPayload(part, asStr)  # bytes to str too?

    # try to find an HTML part
    for part in message.walk():
        type = part.get_content_type()                     # caller renders html
        if type == 'text/html':
            return type, self.decodedPayload(part, asStr)

    # try any other text type, including XML
    for part in message.walk():
        if part.get_content_maintype() == 'text':
            return part.get_content_type(), self.decodedPayload(part, asStr)

    # punt: could use first part, but it's not marked as text
    failtext = '[No text to display]' if asStr else b'[No text to display]'
    return 'text/plain', failtext

def decodeHeader(self, rawheader):
    try:
        parts = email.header.decode_header(rawheader)
        decoded = []
        for (part, enc) in parts:                      # for all substrings
            if enc == None:                            # part unencoded?
                if not isinstance(part, bytes):        # str: full hdr unencoded
                    decoded += [part]                  # else do unicode decode
                else:
                    decoded += [part.decode('raw-unicode-escape')]
            else:
                decoded += [part.decode(enc)]
        return ' '.join(decoded)
    except:
        return rawheader         # punt!

def decodeAddrHeader(self, rawheader):
    try:
        pairs = email.utils.getaddresses([rawheader])  # split addrs and parts
        decoded = []                                   # handles name commas 
        for (name, addr) in pairs:
            try:
                name = self.decodeHeader(name)                # email+MIME+Uni
            except:
                name = None   # but uses encooded name if exc in decodeHeader
            joined = email.utils.formataddr((name, addr))     # join parts
            decoded.append(joined)
        return ', '.join(decoded)                             # >= 1 addrs 
    except:
        return self.decodeHeader(rawheader)    # try decoding entire string

def splitAddresses(self, field):
    try:
        pairs = email.utils.getaddresses([field])                # [(name,addr)]
        return [email.utils.formataddr(pair) for pair in pairs]  # [name <addr>]
    except:
        return ''   # syntax error in user-entered field?, etc.

# returned when parses fail
errorMessage = Message()
errorMessage.set_payload('[Unable to parse message - format error]')

def parseHeaders(self, mailtext):
    try:
        return email.parser.Parser().parsestr(mailtext, headersonly=True)
    except:
        return self.errorMessage

def parseMessage(self, fulltext):
    try:
        return email.parser.Parser().parsestr(fulltext)       # may fail!
    except:
        return self.errorMessage     # or let call handle? can check return

def parseMessageRaw(self, fulltext):
    try:
        return email.parser.HeaderParser().parsestr(fulltext)
    except:
        return self.errorMessage

-----------------------------------------------usercase.py import sys sys.path.append('..') import mailconfig print('config:', mailconfig.file)

get these from init

from mailtools import (MailFetcherConsole, MailSender, MailSenderAuthConsole, MailParser)

if not mailconfig.smtpuser: sender = MailSender(tracesize=5000) else: sender = MailSenderAuthConsole(tracesize=5000)

sender.sendMessage(From = mailconfig.myaddress, To = [mailconfig.myaddress], Subj = 'testing mailtools package', extrahdrs = [('X-Mailer', 'mailtools')], bodytext = 'Here is my source code\n', attaches = ['selftest.py'], )

代码语言:javascript
复制
               # bodytextEncoding='utf-8',          # other tests to try
               # attachesEncodings=['latin-1'],     # inspect text headers
               # attaches=['monkeys.jpg'])          # verify Base64 encoded
               # to='i18n adddr list...',           # test mime/unicode headers

change mailconfig to test fetchlimit

fetcher = MailFetcherConsole() def status(*args): print(args)

hdrs, sizes, loadedall = fetcher.downloadAllHeaders(status) for num, hdr in enumerate(hdrs[:5]): print(hdr) if input('load mail?') in ['y', 'Y']: print(fetcher.downloadMessage(num+1).rstrip(), '\n', '-'*70)

last5 = len(hdrs)-4 msgs, sizes, loadedall = fetcher.downloadAllMessages(status, loadfrom=last5) for msg in msgs: print(msg[:200], '\n', '-'*70)

parser = MailParser() for i in [0]: # try [0 , len(msgs)] fulltext = msgs[i] message = parser.parseMessage(fulltext) ctype, maintext = parser.findMainText(message) print('Parsed:', message['Subject']) print(maintext) input('Press Enter to exit') # pause if clicked on Windows

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-05-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • index/server msgnum out of synch tests
  • specialized subclasses
  • get these from init
  • change mailconfig to test fetchlimit
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档