Python不会像我想要的那样使用try / except语句自动重新连接到db?

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (2)
  • 关注 (0)
  • 查看 (154)

我在下面有一个python脚本尝试连接到sql db。数据库有一些连接问题,我希望脚本尝试重新连接到数据库,所以我添加了try / except。但是,脚本仍然不会自动重新连接try / except语句。能否可以帮助我让我的脚本自动重新连接到数据库

代码

from __future__ import print_function

try:
    import psycopg2
except ImportError:
    raise ImportError('\n\033[33mpsycopg2 library missing. pip install psycopg2\033[1;m\n')
    sys.exit(1)

import re
import sys
import json
import pprint
import time

outfilepath = "crtsh_output/crtsh_flat_file"

DB_HOST = 'crt.sh'
DB_NAME = 'certwatch'
DB_USER = 'guest'

DELAY = 8

#conn = psycopg2.connect("dbname={0} user={1} host={2}".format(DB_NAME, DB_USER, DB_HOST))
#cursor = conn.cursor()

def connect_to_db():
    filepath = 'forager.txt'
#    conn = psycopg2.connect("dbname={0} user={1} host={2}".format(DB_NAME, DB_USER, DB_HOST))
#    cursor = conn.cursor()
    with open(filepath) as fp:
        unique_domains = ''
        try:
            conn = psycopg2.connect("dbname={0} user={1} host={2}".format(DB_NAME, DB_USER, DB_HOST))
            cursor = conn.cursor()
            for cnt, domain_name in enumerate(fp):
                print("Line {}: {}".format(cnt, domain_name))
                print(domain_name)
                domain_name = domain_name.rstrip()

                cursor.execute('''SELECT c.id, x509_commonName(c.certificate), x509_issuerName(c.certificate), x509_notBefore(c.certificate), x509_notAfter(c.certificate), x509_issuerName(c.certificate), x5$
                FROM certificate c, certificate_identity ci WHERE
                c.id= ci.certificate_id AND ci.name_type = 'dNSName' AND lower(ci.name_value) =
                lower(%s) AND x509_notAfter(c.certificate) > statement_timestamp()''', (domain_name,))


                unique_domains = cursor.fetchall()

                pprint.pprint(unique_domains)

                outfilepath = "crtsh1" + ".json"
                with open(outfilepath, 'a') as outfile:
                        outfile.write(json.dumps(unique_domains, sort_keys=True, indent=4, default=str, ensure_ascii = False))
                time.sleep(DELAY)

        except Exception as error:
        #    print("\n\033[1;31m[!] Unable to connect to the database\n\033[1;m")
            print(str(error))

if __name__ == "__main__":
    connect_to_db()
提问于
用户回答回答于

创建一个包装器类。并向类添加助手方法。

import MySQLdb

class DB:
  conn = None

  def connect(self):
    self.conn = MySQLdb.connect()

  def query(self, sql):
    try:
      cursor = self.conn.cursor()
      cursor.execute(sql)
    except (AttributeError, MySQLdb.OperationalError):
      self.connect()
      cursor = self.conn.cursor()
      cursor.execute(sql)
    return cursor

这样

db = DB()
id = 9
sql = "SELECT * FROM foo where id = {}".format(id)
cur = db.query(sql)
用户回答回答于

你可以尝试这样的方法。

def connect_to_db(tries=0):
    filepath = 'forager.txt'
#    conn = psycopg2.connect("dbname={0} user={1} host={2}".format(DB_NAME, DB_USER, DB_HOST))
#    cursor = conn.cursor()
    with open(filepath) as fp:
        unique_domains = ''
        try:
            conn = psycopg2.connect("dbname={0} user={1} host={2}".format(DB_NAME, DB_USER, DB_HOST))
            cursor = conn.cursor()
            for cnt, domain_name in enumerate(fp):
                print("Line {}: {}".format(cnt, domain_name))
                print(domain_name)
                domain_name = domain_name.rstrip()

                cursor.execute('''SELECT c.id, x509_commonName(c.certificate), x509_issuerName(c.certificate), x509_notBefore(c.certificate), x509_notAfter(c.certificate), x509_issuerName(c.certificate), x5$
                FROM certificate c, certificate_identity ci WHERE
                c.id= ci.certificate_id AND ci.name_type = 'dNSName' AND lower(ci.name_value) =
                lower(%s) AND x509_notAfter(c.certificate) > statement_timestamp()''', (domain_name,))


                unique_domains = cursor.fetchall()

                pprint.pprint(unique_domains)

                outfilepath = "crtsh1" + ".json"
                with open(outfilepath, 'a') as outfile:
                        outfile.write(json.dumps(unique_domains, sort_keys=True, indent=4, default=str, ensure_ascii = False))
                time.sleep(DELAY)

        except Exception as error:
        #    print("\n\033[1;31m[!] Unable to connect to the database\n\033[1;m")
            if tries < 3:
                time.sleep(1) # give the DB a bit to recover if you want
                connect_to_db(tries+1)
            else:
                raise error

扫码关注云+社区

领取腾讯云代金券