前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SCF云函数API实践

SCF云函数API实践

原创
作者头像
Vapour
发布2023-07-04 13:21:37
7040
发布2023-07-04 13:21:37
举报

前言

SCF云函数对于我来说真的是非常好用,原先部署在服务器上的一些处理数据的函数可以直接交付给SCF来处理,省了很多服务器的费用。现在腾讯云可以免费试用SCF个人高级版三个月https://cloud.tencent.com/act/free?from=20876,强力推荐!!

在以往想部署一些简单的项目,需要经历服务器的购买、镜像选配、环境搭建、项目部署等一系列操作,而现在直接用SCF就不需要去操劳这些,直接注重业务逻辑,也不需要考虑并发扩容等性能问题。若考虑日志直接搭配CLS生成各种各样的表图,方便后期分析数据日志。

SCF控制台的界面简洁明了,操作指引非常细致,且有很多示例很容易上手。在这里必须要夸一下SCF的相关工程师,解决高并发复制出错的BUG非常快,以及提的SCF相关文档的建议也都采纳了,非常NICE!!

本篇文章主要讲述如何使用API来批量操作SCF。

前置步骤

首先需要导入以下包:

代码语言:python
复制
import json
import time
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.scf.v20180416 import scf_client, models
import threading
import queue
from tqdm import tqdm

用户基础信息配置,这个部分主要配置用户API密钥(这里获取https://console.cloud.tencent.com/cam/capi)以及地域:

代码语言:python
复制
# API密钥
cred = credential.Credential("SecretId", "SecretKey")
httpProfile = HttpProfile()
httpProfile.endpoint = "scf.tencentcloudapi.com"
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
# 地域,这里以北京为例
client = scf_client.ScfClient(cred, "ap-beijing", clientProfile)

API方法

接下来是涉及的一些方法,涉及函数生成、删除、异步配置、通用配置、触发器以及函数信息的获取方便后面判定函数状态是否适合操作。

代码语言:python
复制
# 生成scf函数
def copy_scf_function(scf_name, target_namespace, source_scf_name, source_namespace):
    try:
        req = models.CopyFunctionRequest()
        params = {
            "FunctionName": source_scf_name,
            "NewFunctionName": scf_name,
            "Namespace": source_namespace,
            "TargetNamespace": target_namespace,
            "Description": f'from_{source_scf_name}',
            "CopyConfiguration": True
        }
        req.from_json_string(json.dumps(params))
        resp = client.CopyFunction(req)
        # print('normal:', scf_name, resp.to_json_string())
    except TencentCloudSDKException as err:
        # print('err:', err)
        pass


# 删除函数
def delete_scf_function(scf_name, Namespace):
    try:
        req = models.DeleteFunctionRequest()
        params = {
            "FunctionName": scf_name,
            "Namespace": Namespace
        }
        req.from_json_string(json.dumps(params))
        resp = client.DeleteFunction(req)
        # print('删除函数:', scf_name, resp.to_json_string())
    except TencentCloudSDKException as err:
        # print(err)
        pass


# 修改函数异步配置
def modify_scf_async_settings(scf_name, Namespace, RetryNum=0):
    try:
        req = models.UpdateFunctionEventInvokeConfigRequest()
        params = {
            "AsyncTriggerConfig": {
                "RetryConfig": [
                    {
                        "RetryNum": RetryNum
                    }
                ],
                "MsgTTL": 21600
            },
            "FunctionName": scf_name,
            "Namespace": Namespace
        }
        req.from_json_string(json.dumps(params))
        resp = client.UpdateFunctionEventInvokeConfig(req)
        print('修改异步配置:', scf_name, resp.to_json_string())
    except TencentCloudSDKException as err:
        # print(err)
        pass


# 修改函数配置
def modify_scf_function_settings(scf_name, Namespace, MemorySize=64):
    try:
        req = models.UpdateFunctionConfigurationRequest()
        params = {
            "FunctionName": scf_name,
            "MemorySize": MemorySize,
            "Timeout": 120,
            "Namespace": Namespace,
            "Environment": {
                "Variables": [
                    {
                        "Key": "TZ",
                        "Value": "Asia/Shanghai"  # 设置时区
                    },
                    {
                        "Key": "SEVER_NAME",   # 这里将函数名也作为环境变量
                        "Value": scf_name
                    }
                ]
            },
            "Layers": [
                {
                    "LayerName": "py", # 配置层
                    "LayerVersion": 1  # 曾版本
                }
            ]
        }
        req.from_json_string(json.dumps(params))
        resp = client.UpdateFunctionConfiguration(req)
        # print('修改函数配置:', scf_name, resp.to_json_string())
    except TencentCloudSDKException as err:
        # print(err)
        pass


# 配置触发器
def configue_trigger(scf_name, Namespace, timer='1 * * * * * *', enable='CLOSE', TriggerName='python_auto_gen'):
    try:
        req = models.CreateTriggerRequest()
        params = {
            "FunctionName": scf_name,
            "TriggerName": TriggerName,
            "Type": "timer",
            "TriggerDesc": timer,
            "Namespace": Namespace,
            "Qualifier": "$DEFAULT",
            "Enable": enable  # OPEN CLOSE
        }
        req.from_json_string(json.dumps(params))
        resp = client.CreateTrigger(req)
        # print('创建触发器:', scf_name, resp.to_json_string())
    except TencentCloudSDKException as err:
        # print(err)
        pass


# 删除触发器
def delete_trigger(scf_name, Namespace, TriggerName="python_auto_gen"):
    try:
        req = models.DeleteTriggerRequest()
        params = {
            "FunctionName": scf_name,
            "TriggerName": TriggerName,
            "Namespace": Namespace,
            "Type": "timer",
            "Qualifier": "$DEFAULT"
        }
        req.from_json_string(json.dumps(params))
        # print(req)
        resp = client.DeleteTrigger(req)
        # print('删除触发器:', scf_name, resp.to_json_string())
    except TencentCloudSDKException as err:
        # print(err)
        pass


# 获取scf信息
def get_scf_info(Namespace, SearchKey):
    try:
        req = models.ListFunctionsRequest()
        params = {
            "Order": "ASC",
            "Limit": 50,
            "Namespace": Namespace,
            "SearchKey": SearchKey
        }
        req.from_json_string(json.dumps(params))
        resp = client.ListFunctions(req)
        # print(resp.to_json_string())
        return resp.to_json_string()
    except TencentCloudSDKException as err:
        # print(err)
        return None

整合SCF类

基础的一些方法写好后可以进行批量操作,在下面的这个类中可以实现一键配置函数以及相关配置的操作,由于代码难度不大我就放在一起了。

代码语言:python
复制
class SCF:
    def __init__(self, NameSpace, timer='1 * * * * * *', start=1, end=50, memsize=64, triggername='python_auto_gen'):
        self.ns_list = ['SCF_0_', 'SCF_1_', 'SCF_2_', 'SCF_3_', 'SCF_4_']
        self.NameSpace = NameSpace
        self.triggername = triggername
        self.memsize = memsize
        self.start = start
        self.end = end
        self.ns_prefix = None
        self.timer = timer
        if self.NameSpace == 'default':
            self.ns_prefix = self.ns_list[0]
            # print('NameSpace:', self.NameSpace)
        else:
            for i in self.ns_list:
                if NameSpace in i:
                    self.ns_prefix = i
                    # print('命名空间:', self.NameSpace, '\n前缀:', self.ns_prefix)
                    # print('NameSpace:', self.NameSpace)
                    break
        if not self.ns_prefix:
            raise '配置错误啦!请确认正确后再试!!!'
        self.pbar = None
        self.delete_bar = None
        self.modify = None
        self.trigger = None
        self.delete_trigger = None

    def get_scf_name_list(self):
        scf_name_list = []
        for i in range(self.start, self.end + 1):
            if i <= 9:
                scf_name_list_sub = '0' + str(i)
            else:
                scf_name_list_sub = str(i)
            scf_name_list_sub = self.ns_prefix + scf_name_list_sub
            scf_name_list.append(scf_name_list_sub)
        return scf_name_list

    def fast_modify_trigger(self, scf_name):
        if isinstance(scf_name, list):
            self.delete_trigger = tqdm(total=len(scf_name), desc="Tragger Deleting...", unit="it", colour='MAGENTA')
            q = queue.Queue()
            for i in scf_name:
                q.put(i)
            threads = []
            for th in range(1, 9):
                threads.append(
                    threading.Thread(target=self.__fdt, args=(q,))
                )
            for thread in threads:
                thread.start()
            for thread in threads:
                thread.join()
            self.delete_trigger.desc = 'Tragger Deleted!'
            self.delete_trigger.close()
            self.fast_configue_trigger(scf_name)
        else:
            delete_trigger(scf_name, self.NameSpace)
            configue_trigger(scf_name, self.NameSpace, self.trigger, self.timer)

    def __fdt(self, q: queue.Queue):
        while True:
            if q.empty():
                break
            else:
                scf_name = q.get()
                delete_trigger(scf_name, self.NameSpace, TriggerName=self.triggername)  # 删除触发器
                self.delete_trigger.update(1)

    def fast_copy_scf_function(self, scf_name, source_scf_name, source_scf_namespace):
        if isinstance(scf_name, list):
            self.pbar = tqdm(total=len(scf_name), desc=f"{self.NameSpace}: SCF Copying...", unit="it", colour='MAGENTA')
            q = queue.Queue()
            for i in scf_name:
                q.put([i, source_scf_name, source_scf_namespace])
            threads = []
            for th in range(1, 21):
                threads.append(
                    threading.Thread(target=self.__csf, args=(q,))
                )
            for thread in threads:
                thread.start()
            for thread in threads:
                thread.join()
            self.pbar.desc = f"{self.NameSpace}: SCF Copy Complete!"
            self.pbar.close()
            self.fast_modify_scf_function_settings(scf_name)
        else:
            copy_scf_function(scf_name, self.NameSpace, source_scf_name, source_scf_namespace)

    def __csf(self, q: queue.Queue):
        while True:
            if q.empty():
                break
            else:
                target = q.get()
                copy_scf_function(target[0], self.NameSpace, target[1], target[2])
                # self.pbar.update(1)
                time.sleep(0.1)
                while True:
                    if self.scf_info(target[0]):
                        self.pbar.update(1)
                        break
                    else:
                        time.sleep(0.3)

    def fast_modify_scf_function_settings(self, scf_name):
        if isinstance(scf_name, list):
            q = queue.Queue()
            self.modify = tqdm(total=len(scf_name), desc=f"{self.NameSpace}: SCF Modifying...", unit="it",
                               colour='CYAN')
            for i in scf_name:
                q.put(i)
            threads = []
            for th in range(1, 20):
                threads.append(
                    threading.Thread(target=self.__msfs, args=(q,))
                )
            for thread in threads:
                thread.start()
            for thread in threads:
                thread.join()
            self.modify.desc = f"{self.NameSpace}: SCF Modification Completed!"
            self.modify.close()
            self.fast_configue_trigger(scf_name)
        else:
            modify_scf_function_settings(scf_name, self.NameSpace)

    def __msfs(self, q: queue.Queue):
        while True:
            if q.empty():
                break
            else:
                scf_name = q.get()
                while True:
                    if self.scf_exist(scf_name):
                        if self.scf_info(scf_name):
                            modify_scf_function_settings(scf_name, self.NameSpace, self.memsize)
                            self.modify.update(1)
                            break
                        else:
                            time.sleep(0.1)
                    else:
                        break

    def fast_delete_scf_function(self, scf_name):
        if isinstance(scf_name, list):
            q = queue.Queue()
            self.delete_bar = tqdm(total=len(scf_name), desc=f"{self.NameSpace}: Deleting Old SCF...", unit="it",
                                   colour='GREEN')
            for i in scf_name:
                q.put(i)
            threads = []
            for th in range(1, 15):
                threads.append(
                    threading.Thread(target=self.__dsf, args=(q,))
                )
            for thread in threads:
                thread.start()
            for thread in threads:
                thread.join()
            self.delete_bar.desc = f"{self.NameSpace}: Delete SCF Completed!"
            self.delete_bar.close()
        else:
            if isinstance(scf_name, int):
                if scf_name <= 9:
                    delete_scf_function(f'{self.ns_prefix}0{scf_name}', self.NameSpace)
                else:
                    delete_scf_function(f'{self.ns_prefix}{scf_name}', self.NameSpace)
            else:
                delete_scf_function(scf_name, self.NameSpace)

    def __dsf(self, q: queue.Queue):
        while True:
            if q.empty():
                break
            else:
                scf_name = q.get()
                delete_scf_function(scf_name, self.NameSpace)
                self.delete_bar.update(1)
                # while True:
                #     delete_scf_function(scf_name, self.NameSpace)
                #     if not self.scf_exist(scf_name):
                #         self.delete_bar.update(1)
                #         break
                #     time.sleep(0.1)

    def fast_configue_trigger(self, scf_name):
        if self.timer == 'reset':
            return
        if isinstance(scf_name, list):
            q = queue.Queue()
            self.trigger = tqdm(total=len(scf_name), desc=f"{self.NameSpace}: Trigger Configuring...", unit="个",
                                colour='YELLOW')
            for i in scf_name:
                q.put(i)
            threads = []
            for th in range(1, 20):
                threads.append(
                    threading.Thread(target=self.__fct, args=(q,))
                )
            for thread in threads:
                thread.start()
            for thread in threads:
                thread.join()
            self.trigger.desc = f"{self.NameSpace}: Trigger Configured!"
            self.trigger.close()
        else:
            configue_trigger(scf_name, self.NameSpace)

    def __fct(self, q: queue.Queue):
        while True:
            if q.empty():
                break
            else:
                scf_name = q.get()
                configue_trigger(scf_name, self.NameSpace, self.timer, enable='OPEN',
                                 TriggerName=self.triggername)  # 开启触发器
                # configue_trigger(scf_name, self.NameSpace, self.timer)  # 关闭触发器
                self.trigger.update(1)

    def scf_info(self, scf_name):
        res = get_scf_info(self.NameSpace, scf_name)
        try:
            json_data = json.loads(res)
            for i in json_data['Functions']:
                if i['FunctionName'] == scf_name and i['Status'] == 'Active':
                    return True
            return False
        except:
            print(res)
            return False

    def scf_exist(self, scf_name):
        json_data = json.loads(get_scf_info(self.NameSpace, scf_name))
        if json_data['Functions']:
            return True
        else:
            return False

    def modify_func_async(self, scf_name_list: list):
        for i in scf_name_list:
            modify_scf_async_settings(scf_name=i, Namespace=self.NameSpace)

使用方式

在上述的SCF类中实现对同一地域五个命名空间共250个函数同时操作的功能,但因为实际需要一个用于复制,所以是对249个函数进行操作,限于API的请求限制这里的多线程可以再优化一下数量。

接下来就可以初始化SCF传入必要的命名空间即可调用里面的方法啦。

代码语言:python
复制
tst = SCF(NameSpace='default', timer='1 * * * * * *', start=2, end=50, memsize=64, triggername='pyt0')

如果需要操作其他命名空间可以直接用SCF_1作为命名空间这样子就会自动生成SCF_1_01、SCF_1_02……,注意SCF_0是代表默认的default命名空间

在这里用SCF_1来命名命名空间,则可以写为

代码语言:python
复制
tst = SCF(NameSpace=' SCF_1', timer='1 * * * * * *', start=2, end=50, memsize=64, triggername='pyt1')

如果想根命名空间为default的SCF_0_01作为源函数来复制函数则可以写为:

代码语言:python
复制
tst.fast_copy_scf_function(tst.get_scf_name_list(), 'SCF_0_01', 'default')

操作完事可以再检查一下函数状态是否准备就绪:

代码语言:python
复制
def fast_get_scf_info(q: queue.Queue):
    while True:
        if q.empty():
            break
        else:
            info = q.get()
            json_data = json.loads(get_scf_info(info[0], info[1]))
            if not json_data['Functions']:
                print('函数异常:', info[0], info[1])

def check_status(ls: list):
    q = queue.Queue() 
	# 只需要将scf命名空间和scf名称放入即可
	for namespace, scf_name in ls:    
		q.put([namespace, scf_name])
	threads = []
	for th in range(1, 20):
	    threads.append(
	        threading.Thread(target=fast_get_scf_info, args=(q,))
	    )
	a = time.time()
	print('{:*^20}'.format('开始检查'))
	for thread in threads:
	    thread.start()
	for thread in threads:
	    thread.join()
	print('{:*^20}'.format('检查完成'))
	print('耗时:{:.3f}s'.format(time.time() - a))

总结

文章到这里就结束了,这个类中所涉及的参数并不是全部比如除timer以外的触发器都没有提及,配置的相关参数也没有全部放入,如果您要使用需要根据自身使用情况来进一步修改。最后感谢大家的支持,祝愿腾讯云越来越好呀!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
    • 前置步骤
      • API方法
        • 整合SCF类
          • 使用方式
            • 总结
            相关产品与服务
            云 API
            云 API 是腾讯云开放生态的基石。通过云 API,只需少量的代码即可快速操作云产品;在熟练的情况下,使用云 API 完成一些频繁调用的功能可以极大提高效率;除此之外,通过 API 可以组合功能,实现更高级的功能,易于自动化, 易于远程调用, 兼容性强,对系统要求低。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档