-----------------------------mailconfig.py popservername = 'pop.secureserver.net' popusername = 'PP4E@learning-python.com' smtpservername = 'smtpout.secureserver.net' myaddress = 'PP4E@learning-python.com' mysignature = ('Thanks,\n' '--Mark Lutz (http://learning-python.com, http://rmi.net/~lutz)') smtpuser = None # per your ISP smtppasswdfile = '' # set to '' to be asked poppasswdfile = r'c:\temp\pymailgui.txt' # set to '' to be asked sentmailfile = r'.\sentmail.txt' # . means in current working dir savemailfile = r'c:\temp\savemail.txt' # not used in PyMailGUI: dialog fetchEncoding = 'utf8' # 4E: how to decode and store message text (or latin1?) headersEncodeTo = None # 4E: how to encode non-ASCII headers sent (None=utf8) fetchlimit = 25 # 4E: maximum number headers/emails to fetch on loads ------------------------------------------------mailTool.py class MailTool: # superclass for all mail tools def trace(self, message): # redef me to disable or log to file print(message)
class SilentMailTool: # to mixin instead of subclassing def trace(self, message): pass ----------------------------------------mailSender.py
import mailconfig # client's mailconfig import smtplib, os, mimetypes # mime: name to type import email.utils, email.encoders # date string, base64 from .mailTool import MailTool, SilentMailTool # 4E: package-relative
from email.message import Message # general message, obj->text from email.mime.multipart import MIMEMultipart # type-specific messages from email.mime.audio import MIMEAudio # format/encode attachments from email.mime.image import MIMEImage from email.mime.text import MIMEText from email.mime.base import MIMEBase from email.mime.application import MIMEApplication # 4E: use new app class
def fix_encode_base64(msgobj): linelen = 76 # per MIME standards from email.encoders import encode_base64
encode_base64(msgobj) # what email does normally: leaves bytes
text = msgobj.get_payload() # bytes fails in email pkg on text gen
if isinstance(text, bytes): # payload is bytes in 3.1, str in 3.2 alpha
text = text.decode('ascii') # decode to unicode str so text gen works
lines = [] # split into lines, else 1 massive line
text = text.replace('\n', '') # no \n present in 3.1, but futureproof me!
while text:
line, text = text[:linelen], text[linelen:]
lines.append(line)
msgobj.set_payload('\n'.join(lines))
def fix_text_required(encodingname): from email.charset import Charset, BASE64, QP
charset = Charset(encodingname) # how email knows what to do for encoding
bodyenc = charset.body_encoding # utf8, others require bytes input data
return bodyenc in (None, QP) # ascii, latin1, others require str
class MailSender(MailTool): def init(self, smtpserver=None, tracesize=256): self.smtpServerName = smtpserver or mailconfig.smtpservername self.tracesize = tracesize
def sendMessage(self, From, To, Subj, extrahdrs, bodytext, attaches,
saveMailSeparator=(('=' * 80) + 'PY\n'),
bodytextEncoding='us-ascii',
attachesEncodings=None):
if fix_text_required(bodytextEncoding):
if not isinstance(bodytext, str):
bodytext = bodytext.decode(bodytextEncoding)
else:
if not isinstance(bodytext, bytes):
bodytext = bodytext.encode(bodytextEncoding)
# make message root
if not attaches:
msg = Message()
msg.set_payload(bodytext, charset=bodytextEncoding)
else:
msg = MIMEMultipart()
self.addAttachments(msg, bodytext, attaches,
bodytextEncoding, attachesEncodings)
hdrenc = mailconfig.headersEncodeTo or 'utf-8' # default=utf8
Subj = self.encodeHeader(Subj, hdrenc) # full header
From = self.encodeAddrHeader(From, hdrenc) # email names
To = [self.encodeAddrHeader(T, hdrenc) for T in To] # each recip
Tos = ', '.join(To) # hdr+envelope
# add headers to root
msg['From'] = From
msg['To'] = Tos # poss many: addr list
msg['Subject'] = Subj # servers reject ';' sept
msg['Date'] = email.utils.formatdate() # curr datetime, rfc2822 utc
recip = To
for name, value in extrahdrs: # Cc, Bcc, X-Mailer, etc.
if value:
if name.lower() not in ['cc', 'bcc']:
value = self.encodeHeader(value, hdrenc)
msg[name] = value
else:
value = [self.encodeAddrHeader(V, hdrenc) for V in value]
recip += value # some servers reject ['']
if name.lower() != 'bcc': # 4E: bcc gets mail, no hdr
msg[name] = ', '.join(value) # add commas between cc
recip = list(set(recip)) # 4E: remove duplicates
fullText = msg.as_string() # generate formatted msg
self.trace('Sending to...' + str(recip))
self.trace(fullText[:self.tracesize]) # SMTP calls connect
server = smtplib.SMTP(self.smtpServerName, timeout=20) # this may fail too
self.getPassword() # if srvr requires
self.authenticateServer(server) # login in subclass
try:
failed = server.sendmail(From, recip, fullText) # except or dict
except:
server.close() # 4E: quit may hang!
raise # reraise except
else:
server.quit() # connect + send OK
self.saveSentMessage(fullText, saveMailSeparator) # 4E: do this first
if failed:
class SomeAddrsFailed(Exception): pass
raise SomeAddrsFailed('Failed addrs:%s\n' % failed)
self.trace('Send exit')
def addAttachments(self, mainmsg, bodytext, attaches,
bodytextEncoding, attachesEncodings):
# add main text/plain part
msg = MIMEText(bodytext, _charset=bodytextEncoding)
mainmsg.attach(msg)
# add attachment parts
encodings = attachesEncodings or (['us-ascii'] * len(attaches))
for (filename, fileencode) in zip(attaches, encodings):
# filename may be absolute or relative
if not os.path.isfile(filename): # skip dirs, etc.
continue
# guess content type from file extension, ignore encoding
contype, encoding = mimetypes.guess_type(filename)
if contype is None or encoding is not None: # no guess, compressed?
contype = 'application/octet-stream' # use generic default
self.trace('Adding ' + contype)
# build sub-Message of appropriate kind
maintype, subtype = contype.split('/', 1)
if maintype == 'text': # 4E: text needs encoding
if fix_text_required(fileencode): # requires str or bytes
data = open(filename, 'r', encoding=fileencode)
else:
data = open(filename, 'rb')
msg = MIMEText(data.read(), _subtype=subtype, _charset=fileencode)
data.close()
elif maintype == 'image':
data = open(filename, 'rb') # 4E: use fix for binaries
msg = MIMEImage(
data.read(), _subtype=subtype, _encoder=fix_encode_base64)
data.close()
elif maintype == 'audio':
data = open(filename, 'rb')
msg = MIMEAudio(
data.read(), _subtype=subtype, _encoder=fix_encode_base64)
data.close()
elif maintype == 'application': # new in 4E
data = open(filename, 'rb')
msg = MIMEApplication(
data.read(), _subtype=subtype, _encoder=fix_encode_base64)
data.close()
else:
data = open(filename, 'rb') # application/* could
msg = MIMEBase(maintype, subtype) # use this code too
msg.set_payload(data.read())
data.close() # make generic type
fix_encode_base64(msg) # was broken here too!
#email.encoders.encode_base64(msg) # encode using base64
# set filename and attach to container
basename = os.path.basename(filename)
msg.add_header('Content-Disposition',
'attachment', filename=basename)
mainmsg.attach(msg)
# text outside mime structure, seen by non-MIME mail readers
mainmsg.preamble = 'A multi-part MIME format message.\n'
mainmsg.epilogue = '' # make sure message ends with a newline
def saveSentMessage(self, fullText, saveMailSeparator):
try:
sentfile = open(mailconfig.sentmailfile, 'a',
encoding=mailconfig.fetchEncoding) # 4E
if fullText[-1] != '\n': fullText += '\n'
sentfile.write(saveMailSeparator)
sentfile.write(fullText)
sentfile.close()
except:
self.trace('Could not save sent message') # not a show-stopper
def encodeHeader(self, headertext, unicodeencoding='utf-8'):
try:
headertext.encode('ascii')
except:
try:
hdrobj = email.header.make_header([(headertext, unicodeencoding)])
headertext = hdrobj.encode()
except:
pass # auto splits into multiple cont lines if needed
return headertext # smtplib may fail if it won't encode to ascii
def encodeAddrHeader(self, headertext, unicodeencoding='utf-8'):
try:
pairs = email.utils.getaddresses([headertext]) # split addrs + parts
encoded = []
for name, addr in pairs:
try:
name.encode('ascii') # use as is if okay as ascii
except UnicodeError: # else try to encode name part
try:
uni = name.encode(unicodeencoding)
hdr = email.header.make_header([(uni, unicodeencoding)])
name = hdr.encode()
except:
name = None # drop name, use address part only
joined = email.utils.formataddr((name, addr)) # quote name if need
encoded.append(joined)
fullhdr = ', '.join(encoded)
if len(fullhdr) > 72 or '\n' in fullhdr: # not one short line?
fullhdr = ',\n '.join(encoded) # try multiple lines
return fullhdr
except:
return self.encodeHeader(headertext)
def authenticateServer(self, server):
pass # no login required for this server/class
def getPassword(self):
pass # no login required for this server/class
class MailSenderAuth(MailSender): smtpPassword = None # 4E: on class, not self, shared by poss N instances
def __init__(self, smtpserver=None, smtpuser=None):
MailSender.__init__(self, smtpserver)
self.smtpUser = smtpuser or mailconfig.smtpuser
# self.smtpPassword = None # 4E: makes pyMailGUI ask for each send!
def authenticateServer(self, server):
server.login(self.smtpUser, self.smtpPassword)
def getPassword(self):
if not self.smtpPassword:
try:
localfile = open(mailconfig.smtppasswdfile)
MailSenderAuth.smtpPassword = localfile.readline()[:-1] # 4E
self.trace('local file password' + repr(self.smtpPassword))
except:
MailSenderAuth.smtpPassword = self.askSmtpPassword() # 4E
def askSmtpPassword(self):
assert False, 'Subclass must define method'
class MailSenderAuthConsole(MailSenderAuth): def askSmtpPassword(self): import getpass prompt = 'Password for %s on %s?' % (self.smtpUser, self.smtpServerName) return getpass.getpass(prompt)
class SilentMailSender(SilentMailTool, MailSender): pass # replaces trace ----------------------------------------------mailFetcher.py
import poplib, mailconfig, sys # client's mailconfig on sys.path print('user:', mailconfig.popusername) # script dir, pythonpath, changes
from .mailParser import MailParser # for headers matching (4E: .) from .mailTool import MailTool, SilentMailTool # trace control supers (4E: .)
class DeleteSynchError(Exception): pass # msg out of synch in del class TopNotSupported(Exception): pass # can't run synch test class MessageSynchError(Exception): pass # index list out of sync
class MailFetcher(MailTool): def init(self, popserver=None, popuser=None, poppswd=None, hastop=True): self.popServer = popserver or mailconfig.popservername self.popUser = popuser or mailconfig.popusername self.srvrHasTop = hastop self.popPassword = poppswd # ask later if None
def connect(self):
self.trace('Connecting...')
self.getPassword() # file, GUI, or console
server = poplib.POP3(self.popServer, timeout=20)
server.user(self.popUser) # connect,login POP server
server.pass_(self.popPassword) # pass is a reserved word
self.trace(server.getwelcome()) # print returned greeting
return server
fetchEncoding = mailconfig.fetchEncoding
def decodeFullText(self, messageBytes):
text = None
kinds = [self.fetchEncoding] # try user setting first
kinds += ['ascii', 'latin1', 'utf8'] # then try common types
kinds += [sys.getdefaultencoding()] # and platform dflt (may differ)
for kind in kinds: # may cause mail saves to fail
try:
text = [line.decode(kind) for line in messageBytes]
break
except (UnicodeError, LookupError): # LookupError: bad name
pass
if text == None:
# try returning headers + error msg, else except may kill client;
# still try to decode headers per ascii, other, platform default;
blankline = messageBytes.index(b'')
hdrsonly = messageBytes[:blankline]
commons = ['ascii', 'latin1', 'utf8']
for common in commons:
try:
text = [line.decode(common) for line in hdrsonly]
break
except UnicodeError:
pass
else: # none worked
try:
text = [line.decode() for line in hdrsonly] # platform dflt?
except UnicodeError:
text = ['From: (sender of unknown Unicode format headers)']
text += ['', '--Sorry: mailtools cannot decode this mail content!--']
return text
def downloadMessage(self, msgnum):
self.trace('load ' + str(msgnum))
server = self.connect()
try:
resp, msglines, respsz = server.retr(msgnum)
finally:
server.quit()
msglines = self.decodeFullText(msglines) # raw bytes to Unicode str
return '\n'.join(msglines) # concat lines for parsing
def downloadAllHeaders(self, progress=None, loadfrom=1):
if not self.srvrHasTop: # not all servers support TOP
# naively load full msg text
return self.downloadAllMsgs(progress, loadfrom)
else:
self.trace('loading headers')
fetchlimit = mailconfig.fetchlimit
server = self.connect() # mbox now locked until quit
try:
resp, msginfos, respsz = server.list() # 'num size' lines list
msgCount = len(msginfos) # alt to srvr.stat[0]
msginfos = msginfos[loadfrom-1:] # drop already loadeds
allsizes = [int(x.split()[1]) for x in msginfos]
allhdrs = []
for msgnum in range(loadfrom, msgCount+1): # poss empty
if progress: progress(msgnum, msgCount) # run callback
if fetchlimit and (msgnum <= msgCount - fetchlimit):
# skip, add dummy hdrs
hdrtext = 'Subject: --mail skipped--\n\n'
allhdrs.append(hdrtext)
else:
# fetch, retr hdrs only
resp, hdrlines, respsz = server.top(msgnum, 0)
hdrlines = self.decodeFullText(hdrlines)
allhdrs.append('\n'.join(hdrlines))
finally:
server.quit() # make sure unlock mbox
assert len(allhdrs) == len(allsizes)
self.trace('load headers exit')
return allhdrs, allsizes, False
def downloadAllMessages(self, progress=None, loadfrom=1):
self.trace('loading full messages')
fetchlimit = mailconfig.fetchlimit
server = self.connect()
try:
(msgCount, msgBytes) = server.stat() # inbox on server
allmsgs = []
allsizes = []
for i in range(loadfrom, msgCount+1): # empty if low >= high
if progress: progress(i, msgCount)
if fetchlimit and (i <= msgCount - fetchlimit):
# skip, add dummy mail
mailtext = 'Subject: --mail skipped--\n\nMail skipped.\n'
allmsgs.append(mailtext)
allsizes.append(len(mailtext))
else:
# fetch, retr full mail
(resp, message, respsz) = server.retr(i) # save text on list
message = self.decodeFullText(message)
allmsgs.append('\n'.join(message)) # leave mail on server
allsizes.append(respsz) # diff from len(msg)
finally:
server.quit() # unlock the mail box
assert len(allmsgs) == (msgCount - loadfrom) + 1 # msg nums start at 1
#assert sum(allsizes) == msgBytes # not if loadfrom > 1
return allmsgs, allsizes, True # not if fetchlimit
def deleteMessages(self, msgnums, progress=None):
self.trace('deleting mails')
server = self.connect()
try:
for (ix, msgnum) in enumerate(msgnums): # don't reconnect for each
if progress: progress(ix+1, len(msgnums))
server.dele(msgnum)
finally: # changes msgnums: reload
server.quit()
def deleteMessagesSafely(self, msgnums, synchHeaders, progress=None):
if not self.srvrHasTop:
raise TopNotSupported('Safe delete cancelled')
self.trace('deleting mails safely')
errmsg = 'Message %s out of synch with server.\n'
errmsg += 'Delete terminated at this message.\n'
errmsg += 'Mail client may require restart or reload.'
server = self.connect() # locks inbox till quit
try: # don't reconnect for each
(msgCount, msgBytes) = server.stat() # inbox size on server
for (ix, msgnum) in enumerate(msgnums):
if progress: progress(ix+1, len(msgnums))
if msgnum > msgCount: # msgs deleted
raise DeleteSynchError(errmsg % msgnum)
resp, hdrlines, respsz = server.top(msgnum, 0) # hdrs only
hdrlines = self.decodeFullText(hdrlines)
msghdrs = '\n'.join(hdrlines)
if not self.headersMatch(msghdrs, synchHeaders[msgnum-1]):
raise DeleteSynchError(errmsg % msgnum)
else:
server.dele(msgnum) # safe to delete this msg
finally: # changes msgnums: reload
server.quit() # unlock inbox on way out
def checkSynchError(self, synchHeaders):
self.trace('synch check')
errormsg = 'Message index out of synch with mail server.\n'
errormsg += 'Mail client may require restart or reload.'
server = self.connect()
try:
lastmsgnum = len(synchHeaders) # 1..N
(msgCount, msgBytes) = server.stat() # inbox size
if lastmsgnum > msgCount: # fewer now?
raise MessageSynchError(errormsg) # none to cmp
if self.srvrHasTop:
resp, hdrlines, respsz = server.top(lastmsgnum, 0) # hdrs only
hdrlines = self.decodeFullText(hdrlines)
lastmsghdrs = '\n'.join(hdrlines)
if not self.headersMatch(lastmsghdrs, synchHeaders[-1]):
raise MessageSynchError(errormsg)
finally:
server.quit()
def headersMatch(self, hdrtext1, hdrtext2):
# try match by simple string compare
if hdrtext1 == hdrtext2:
self.trace('Same headers text')
return True
# try match without status lines
split1 = hdrtext1.splitlines() # s.split('\n'), but no final ''
split2 = hdrtext2.splitlines()
strip1 = [line for line in split1 if not line.startswith('Status:')]
strip2 = [line for line in split2 if not line.startswith('Status:')]
if strip1 == strip2:
self.trace('Same without Status')
return True
# try mismatch by message-id headers if either has one
msgid1 = [line for line in split1 if line[:11].lower() == 'message-id:']
msgid2 = [line for line in split2 if line[:11].lower() == 'message-id:']
if (msgid1 or msgid2) and (msgid1 != msgid2):
self.trace('Different Message-Id')
return False
# try full hdr parse and common headers if msgid missing or trash
tryheaders = ('From', 'To', 'Subject', 'Date')
tryheaders += ('Cc', 'Return-Path', 'Received')
msg1 = MailParser().parseHeaders(hdrtext1)
msg2 = MailParser().parseHeaders(hdrtext2)
for hdr in tryheaders: # poss multiple Received
if msg1.get_all(hdr) != msg2.get_all(hdr): # case insens, dflt None
self.trace('Diff common headers')
return False
# all common hdrs match and don't have a diff message-id
self.trace('Same common headers')
return True
def getPassword(self):
if not self.popPassword:
try:
localfile = open(mailconfig.poppasswdfile)
self.popPassword = localfile.readline()[:-1]
self.trace('local file password' + repr(self.popPassword))
except:
self.popPassword = self.askPopPassword()
def askPopPassword(self):
assert False, 'Subclass must define method'
################################################################################
################################################################################
class MailFetcherConsole(MailFetcher): def askPopPassword(self): import getpass prompt = 'Password for %s on %s?' % (self.popUser, self.popServer) return getpass.getpass(prompt)
class SilentMailFetcher(SilentMailTool, MailFetcher): pass # replaces trace ----------------------------------------------mailParser.py """ ############################################################################### parsing and attachment extract, analyse, save (see init for docs, test) ############################################################################### """
import os, mimetypes, sys # mime: map type to name import email.parser # parse text to Message object import email.header # 4E: headers decode/encode import email.utils # 4E: addr header parse/decode from email.message import Message # Message may be traversed from .mailTool import MailTool # 4E: package-relative
class MailParser(MailTool):
def walkNamedParts(self, message):
for (ix, part) in enumerate(message.walk()): # walk includes message
fulltype = part.get_content_type() # ix includes parts skipped
maintype = part.get_content_maintype()
if maintype == 'multipart': # multipart/*: container
continue
elif fulltype == 'message/rfc822': # 4E: skip message/rfc822
continue # skip all message/* too?
else:
filename, contype = self.partName(part, ix)
yield (filename, contype, part)
def partName(self, part, ix):
filename = part.get_filename() # filename in msg hdrs?
contype = part.get_content_type() # lowercase maintype/subtype
if not filename:
filename = part.get_param('name') # try content-type name
if not filename:
if contype == 'text/plain': # hardcode plain text ext
ext = '.txt' # else guesses .ksh!
else:
ext = mimetypes.guess_extension(contype)
if not ext: ext = '.bin' # use a generic default
filename = 'part-%03d%s' % (ix, ext)
return (filename, contype)
def saveParts(self, savedir, message):
if not os.path.exists(savedir):
os.mkdir(savedir)
partfiles = []
for (filename, contype, part) in self.walkNamedParts(message):
fullname = os.path.join(savedir, filename)
fileobj = open(fullname, 'wb') # use binary mode
content = part.get_payload(decode=1) # decode base64,qp,uu
if not isinstance(content, bytes): # 4E: need bytes for rb
content = b'(no content)' # decode=1 returns bytes,
fileobj.write(content) # but some payloads None
fileobj.close() # 4E: not str(content)
partfiles.append((contype, fullname)) # for caller to open
return partfiles
def saveOnePart(self, savedir, partname, message):
"""
ditto, but find and save just one part by name
"""
if not os.path.exists(savedir):
os.mkdir(savedir)
fullname = os.path.join(savedir, partname)
(contype, content) = self.findOnePart(partname, message)
if not isinstance(content, bytes): # 4E: need bytes for rb
content = b'(no content)' # decode=1 returns bytes,
open(fullname, 'wb').write(content) # but some payloads None
return (contype, fullname) # 4E: not str(content)
def partsList(self, message):
validParts = self.walkNamedParts(message)
return [filename for (filename, contype, part) in validParts]
def findOnePart(self, partname, message):
for (filename, contype, part) in self.walkNamedParts(message):
if filename == partname:
content = part.get_payload(decode=1) # does base64,qp,uu
return (contype, content) # may be bytes text
def decodedPayload(self, part, asStr=True):
payload = part.get_payload(decode=1) # payload may be bytes
if asStr and isinstance(payload, bytes): # decode=1 returns bytes
tries = []
enchdr = part.get_content_charset() # try msg headers first!
if enchdr:
tries += [enchdr] # try headers first
tries += [sys.getdefaultencoding()] # same as bytes.decode()
tries += ['latin1', 'utf8'] # try 8-bit, incl ascii
for trie in tries: # try utf8 (windows dflt)
try:
payload = payload.decode(trie) # give it a shot, eh?
break
except (UnicodeError, LookupError): # lookuperr: bad name
pass
else:
payload = '--Sorry: cannot decode Unicode text--'
return payload
def findMainText(self, message, asStr=True):
# try to find a plain text
for part in message.walk(): # walk visits message
type = part.get_content_type() # if nonmultipart
if type == 'text/plain': # may be base64,qp,uu
return type, self.decodedPayload(part, asStr) # bytes to str too?
# try to find an HTML part
for part in message.walk():
type = part.get_content_type() # caller renders html
if type == 'text/html':
return type, self.decodedPayload(part, asStr)
# try any other text type, including XML
for part in message.walk():
if part.get_content_maintype() == 'text':
return part.get_content_type(), self.decodedPayload(part, asStr)
# punt: could use first part, but it's not marked as text
failtext = '[No text to display]' if asStr else b'[No text to display]'
return 'text/plain', failtext
def decodeHeader(self, rawheader):
try:
parts = email.header.decode_header(rawheader)
decoded = []
for (part, enc) in parts: # for all substrings
if enc == None: # part unencoded?
if not isinstance(part, bytes): # str: full hdr unencoded
decoded += [part] # else do unicode decode
else:
decoded += [part.decode('raw-unicode-escape')]
else:
decoded += [part.decode(enc)]
return ' '.join(decoded)
except:
return rawheader # punt!
def decodeAddrHeader(self, rawheader):
try:
pairs = email.utils.getaddresses([rawheader]) # split addrs and parts
decoded = [] # handles name commas
for (name, addr) in pairs:
try:
name = self.decodeHeader(name) # email+MIME+Uni
except:
name = None # but uses encooded name if exc in decodeHeader
joined = email.utils.formataddr((name, addr)) # join parts
decoded.append(joined)
return ', '.join(decoded) # >= 1 addrs
except:
return self.decodeHeader(rawheader) # try decoding entire string
def splitAddresses(self, field):
try:
pairs = email.utils.getaddresses([field]) # [(name,addr)]
return [email.utils.formataddr(pair) for pair in pairs] # [name <addr>]
except:
return '' # syntax error in user-entered field?, etc.
# returned when parses fail
errorMessage = Message()
errorMessage.set_payload('[Unable to parse message - format error]')
def parseHeaders(self, mailtext):
try:
return email.parser.Parser().parsestr(mailtext, headersonly=True)
except:
return self.errorMessage
def parseMessage(self, fulltext):
try:
return email.parser.Parser().parsestr(fulltext) # may fail!
except:
return self.errorMessage # or let call handle? can check return
def parseMessageRaw(self, fulltext):
try:
return email.parser.HeaderParser().parsestr(fulltext)
except:
return self.errorMessage
-----------------------------------------------usercase.py import sys sys.path.append('..') import mailconfig print('config:', mailconfig.file)
from mailtools import (MailFetcherConsole, MailSender, MailSenderAuthConsole, MailParser)
if not mailconfig.smtpuser: sender = MailSender(tracesize=5000) else: sender = MailSenderAuthConsole(tracesize=5000)
sender.sendMessage(From = mailconfig.myaddress, To = [mailconfig.myaddress], Subj = 'testing mailtools package', extrahdrs = [('X-Mailer', 'mailtools')], bodytext = 'Here is my source code\n', attaches = ['selftest.py'], )
# bodytextEncoding='utf-8', # other tests to try
# attachesEncodings=['latin-1'], # inspect text headers
# attaches=['monkeys.jpg']) # verify Base64 encoded
# to='i18n adddr list...', # test mime/unicode headers
fetcher = MailFetcherConsole() def status(*args): print(args)
hdrs, sizes, loadedall = fetcher.downloadAllHeaders(status) for num, hdr in enumerate(hdrs[:5]): print(hdr) if input('load mail?') in ['y', 'Y']: print(fetcher.downloadMessage(num+1).rstrip(), '\n', '-'*70)
last5 = len(hdrs)-4 msgs, sizes, loadedall = fetcher.downloadAllMessages(status, loadfrom=last5) for msg in msgs: print(msg[:200], '\n', '-'*70)
parser = MailParser() for i in [0]: # try [0 , len(msgs)] fulltext = msgs[i] message = parser.parseMessage(fulltext) ctype, maintext = parser.findMainText(message) print('Parsed:', message['Subject']) print(maintext) input('Press Enter to exit') # pause if clicked on Windows