首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将记录列表文件夹存储在Google中

将记录列表文件夹存储在Google中
EN

Code Review用户
提问于 2013-02-11 22:26:37
回答 1查看 637关注 0票数 3

我编写了一个脚本,使用Django ORM存储“配置设置”,并与Google和另一个API交互。该脚本在Google中为记录列表创建一个基本项目文件夹和子文件夹,并将链接发布回源数据库。我是Python的初学者,所以我不确定我做的是正确的,还是完全糟糕的。

我认为有些地方是可以清理的--例如,我是如何在特定方法中反复创建'client‘和'service’对象的。但是,我不确定它是否能够创建对象并在方法之间传递它们。我还可以使用Google的“批处理”和“部分响应”方法来优化HTTP请求。

此外,我需要让这个东西在一个周期性的任务或crontab。我对尝试芹菜很感兴趣,所以我一开始做了下面的工作,他们创造了客户,但我不认为它在芹菜中起作用。

我删除了一些脚本,并评论了一些地方,以使它更容易理解。

代码语言:javascript
运行
复制
# Celery tasks
@periodic_task(run_every=crontab(hour="*", minute="*", day_of_week="*"))
def sync_folders():
    configs = Configuration.objects.all()
    for config in configs:
        user = config.user
        client = create_client(user, config)
        try:
            create_document_folders(config, client)
        except Exception as inst:
            pass

@periodic_task(run_every=crontab(hour="*", minute="5", day_of_week="*"))
def sync_files():
    configs = Configuration.objects.all()
    for config in configs:
        user = config.user
        client = create_client(user, config)
        try:
            get_updated_files(config, client)
        except Exception as inst:
            pass

# Rest of post is the script initiated by celery tasks above. 
def create_client(user, config):
    # Create the QuickBase Client object
    s = config
    if s is not None:
        if s.qb_realm is not None:
            if s.qb_realm != 'www':
                baseurl = 'https://' + s.qb_realm + '.quickbase.com'
            else:
                baseurl = 'https://www.quickbase.com'
        else:
            baseurl = 'https://www.quickbase.com'
        client = quickbase.Client(QUICKBASE_USER, QUICKBASE_PASSWORD, base_url=baseurl)
        return client

def get_jobs_wo_folders(config, client):
    if client is not None:
        query = "{'" + config.root_folder_id + "'.EX.''}"
        clist = [config.root_folder_id, config.root_folder_name_fid, config.root_folder_user, '3']
        records = client.do_query(query, columns=clist, database=config.qb_root_dbid)
        return records

def retrieve_specific_files(service, param):
    # Search and retrieve a list of file resources passing a params object
    result = []
    page_token = None
    while True:
        try:
            if page_token:
                param['pageToken'] = page_token
            files = service.files().list(**param).execute()
            result.extend(files['items'])
            page_token = files.get('nextPageToken')
            if not page_token:
                break
        except errors.HttpError, error:
            print 'An error occurred: %s' % error
            break
    return result

def get_csv(list):
    si = cStringIO.StringIO()
    cw = csv.writer(si)
    cw.writerows(list)
    return si.getvalue().strip('\r\n')

# Google helper functions
def refresh_access_token(user, config):
    instance = UserSocialAuth.objects.get(user=user, provider='google-oauth2')
    instance.refresh_token()
    # Configuration model has a field 'last_refresh_time' that is auto-now date/time
    config.save()
    return instance.extra_data['access_token']

def get_access_token(config):
    try:
        access_token = UserSocialAuth.objects.get(user=config.user, provider='google-oauth2').extra_data['access_token']
        return access_token
    except Exception as inst:
        pprint(inst)

def create_drive_service(config):
"""
Creates a drive service using the 'Configuration' object's
related user. Users initial signup is done with django-socialauth;
the access token and refresh token is saved with django-socialauth &
refresh token handling is done with django-socialauth.
"""
    c = config
    user = config.user
    instance = UserSocialAuth.objects.get(user=user, provider='google-oauth2')
    refreshed_at = c.token_last_refresh
    now = datetime.now()
    expires_in =  instance.extra_data['expires']
    token_age = (now - refreshed_at).seconds
    if token_age > (expires_in - 120):
        access_token = refresh_access_token(user, config)
    else:
        access_token = instance.extra_data['access_token']

    try:
        credentials = AccessTokenCredentials(access_token, 'Python-urllib2/2.7')
        http = httplib2.Http()
        http = credentials.authorize(http)
        return build('drive', 'v2', http=http)
    except Exception as e:
        pprint(e)
    # Are these next lines pointless? What should I do if the tokens consistently fail?
        access_token = refresh_access_token(user)
        http = httplib2.Http()
        http = credentials.authorize(http)
        return build('drive', 'v2', http=http)

def insert_permission(service, file_id, value, perm_type, role):
    new_permission = {
    'value': value,
    'type': perm_type,
    'role': role,
    }
    try:
        return service.permissions().insert(
            fileId=file_id, body=new_permission, sendNotificationEmails=False).execute()
    except errors.HttpError, error:
        print 'An error occured: %s' % error
    return None

def insert_folder(config, title, parent_idNone, writer=None, mime_type='application/vnd.google-apps.folder'):
    service = create_drive_service(config)
    owner_permission = {
        'role': 'owner',
        'type': 'user',
        'value': 'me',
        }
    body = {
        'title': title,
        'mimeType': mime_type,
        'userPermission': owner_permission,
        'fields': 'items, items/parents',
        }
    if parent_id:
        body['parents'] = [{'id': parent_id}]
    try:
        folder = service.files().insert(
            body=body,
        ).execute()
        insert_permission(service, folder['id'], writer, 'user', 'writer')
        return folder
    except errors.HttpError, error:
        print 'An error occured: %s' % error
        return None

def create_document_folders(config, client):
    """
    Gets records that do not have Google drive folders, 
    and loops through the jobs creating the necessary
    base project folder and subfolders.
    """
    s = config
    records = get_jobs_wo_folders(config, client)
    """
A bunch of logic goes below this to loop through the list of records,
create the new folders, and append the new folder metadata to a the 'rows' list
which is later converted to records_csv
"""
if rows:
        records_csv = get_csv(rows)
    # Create the records in QuickBase
        response = client.import_from_csv(records_csv, clist=folders_clist, database=s.folders_dbid)
EN

回答 1

Code Review用户

发布于 2013-02-12 01:48:33

代码语言:javascript
运行
复制
# Celery tasks
@periodic_task(run_every=crontab(hour="*", minute="*", day_of_week="*"))
def sync_folders():
    configs = Configuration.objects.all()
    for config in configs:
        user = config.user
        client = create_client(user, config)

我会这么做:create_client(config.user, config)

代码语言:javascript
运行
复制
        try:
            create_document_folders(config, client)
        except Exception as inst:
            pass

不要这样做。你抓住一个异常并忽略它。你根本不知道哪里出了问题。您至少应该打印出异常,或者将其放入日志中,或者其他什么东西,这样您就可以知道出了什么问题。

代码语言:javascript
运行
复制
@periodic_task(run_every=crontab(hour="*", minute="5", day_of_week="*"))
def sync_files():
    configs = Configuration.objects.all()
    for config in configs:
        user = config.user
        client = create_client(user, config)
        try:
            get_updated_files(config, client)
        except Exception as inst:
            pass

又是同样的功能。重构它们并将get_updated_filescreate_document_folder作为参数传递给新函数。

代码语言:javascript
运行
复制
# Rest of post is the script initiated by celery tasks above. 
def create_client(user, config):
    # Create the QuickBase Client object
    s = config

为什么?只需使用配置即可。S唯一的优势是它很难读懂。

代码语言:javascript
运行
复制
    if s is not None:

您真的想支持配置为None吗?

代码语言:javascript
运行
复制
        if s.qb_realm is not None:
            if s.qb_realm != 'www':
                baseurl = 'https://' + s.qb_realm + '.quickbase.com'

一般来说,使用字符串格式比串联更好。

代码语言:javascript
运行
复制
            else:
                baseurl = 'https://www.quickbase.com'

为什么这是个特例?它只是做同样的事情,上面的情况会这样做。

代码语言:javascript
运行
复制
        else:
            baseurl = 'https://www.quickbase.com'


        client = quickbase.Client(QUICKBASE_USER, QUICKBASE_PASSWORD, base_url=baseurl)
        return client

把这两条线结合起来

代码语言:javascript
运行
复制
def get_jobs_wo_folders(config, client):
    if client is not None:

在输入错误的情况下,让一个函数默默地不能完成它的工作是个坏主意。你应该抛出一个异常。我甚至不会费心去检查客户端是否没有,而只是让它在尝试使用它时失败。

代码语言:javascript
运行
复制
        query = "{'" + config.root_folder_id + "'.EX.''}"
        clist = [config.root_folder_id, config.root_folder_name_fid, config.root_folder_user, '3']
        records = client.do_query(query, columns=clist, database=config.qb_root_dbid)
        return records

def retrieve_specific_files(service, param):
    # Search and retrieve a list of file resources passing a params object
    result = []
    page_token = None
    while True:
        try:
            if page_token:

使用is not None进行无检查,仅仅是因为其他一些随机事件最终是假的,而且这样做更显着。

代码语言:javascript
运行
复制
                param['pageToken'] = page_token
            files = service.files().list(**param).execute()
            result.extend(files['items'])
            page_token = files.get('nextPageToken')
            if not page_token:
                break
        except errors.HttpError, error:
            print 'An error occurred: %s' % error
            break

如果有错误,您真的想继续吗?你不应该放弃这次尝试吗?

代码语言:javascript
运行
复制
    return result

def get_csv(list):

避免将列表作为变量名,因为它是内置python类型。

代码语言:javascript
运行
复制
    si = cStringIO.StringIO()
    cw = csv.writer(si)
    cw.writerows(list)
    return si.getvalue().strip('\r\n')

# Google helper functions
def refresh_access_token(user, config):
    instance = UserSocialAuth.objects.get(user=user, provider='google-oauth2')
    instance.refresh_token()
    # Configuration model has a field 'last_refresh_time' that is auto-now date/time
    config.save()
    return instance.extra_data['access_token']

def get_access_token(config):
    try:
        access_token = UserSocialAuth.objects.get(user=config.user, provider='google-oauth2').extra_data['access_token']

你刚刚做了一些非常相似的事情,这意味着你应该提取一个共同的函数

代码语言:javascript
运行
复制
        return access_token
    except Exception as inst:
        pprint(inst)

至少在这里你打印了例外。

代码语言:javascript
运行
复制
def create_drive_service(config):
"""
Creates a drive service using the 'Configuration' object's
related user. Users initial signup is done with django-socialauth;
the access token and refresh token is saved with django-socialauth &
refresh token handling is done with django-socialauth.
"""
    c = config
    user = config.user

为什么有时使用config.user,而其他时候则将用户作为参数传递?

代码语言:javascript
运行
复制
    instance = UserSocialAuth.objects.get(user=user, provider='google-oauth2')

避免像实例这样的通用名称

代码语言:javascript
运行
复制
    refreshed_at = c.token_last_refresh
    now = datetime.now()
    expires_in =  instance.extra_data['expires']
    token_age = (now - refreshed_at).seconds
    if token_age > (expires_in - 120):
        access_token = refresh_access_token(user, config)

但是你已经有了,所以这是额外的工作

代码语言:javascript
运行
复制
    else:
        access_token = instance.extra_data['access_token']

    try:
        credentials = AccessTokenCredentials(access_token, 'Python-urllib2/2.7')
        http = httplib2.Http()
        http = credentials.authorize(http)
        return build('drive', 'v2', http=http)
    except Exception as e:
        pprint(e)
    # Are these next lines pointless? What should I do if the tokens consistently fail?
        access_token = refresh_access_token(user)

您不需要这样做,因为您刚刚刷新了上面的令牌。

代码语言:javascript
运行
复制
        http = httplib2.Http()
        http = credentials.authorize(http)
        return build('drive', 'v2', http=http)

def insert_permission(service, file_id, value, perm_type, role):
    new_permission = {
    'value': value,
    'type': perm_type,
    'role': role,
    }
    try:
        return service.permissions().insert(
            fileId=file_id, body=new_permission, sendNotificationEmails=False).execute()
    except errors.HttpError, error:
        print 'An error occured: %s' % error
    return None


def insert_folder(config, title, parent_idNone, writer=None, mime_type='application/vnd.google-apps.folder'):
    service = create_drive_service(config)
    owner_permission = {
        'role': 'owner',
        'type': 'user',
        'value': 'me',
        }
    body = {
        'title': title,
        'mimeType': mime_type,
        'userPermission': owner_permission,
        'fields': 'items, items/parents',
        }
    if parent_id:
        body['parents'] = [{'id': parent_id}]
    try:
        folder = service.files().insert(
            body=body,
        ).execute()
        insert_permission(service, folder['id'], writer, 'user', 'writer')
        return folder
    except errors.HttpError, error:
        print 'An error occured: %s' % error
        return None

def create_document_folders(config, client):
    """
    Gets records that do not have Google drive folders, 
    and loops through the jobs creating the necessary
    base project folder and subfolders.
    """
    s = config
    records = get_jobs_wo_folders(config, client)
    """
A bunch of logic goes below this to loop through the list of records,
create the new folders, and append the new folder metadata to a the 'rows' list
which is later converted to records_csv
"""
if rows:
        records_csv = get_csv(rows)
    # Create the records in QuickBase
        response = client.import_from_csv(records_csv, clist=folders_clist, database=s.folders_dbid)
票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/21597

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档