前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >具有EC2自动训练的无服务器TensorFlow工作流程

具有EC2自动训练的无服务器TensorFlow工作流程

作者头像
代码医生工作室
发布2019-10-15 17:21:39
12.5K0
发布2019-10-15 17:21:39
举报
文章被收录于专栏:相约机器人相约机器人

作者 | Mike Moritz

来源 | Medium

编辑 | 代码医生团队

机器学习训练工作通常是时间和资源密集型的,因此将这一过程整合到实时自动化工作流程中可能会面临挑战。

尽管可以在Lambda上运行标准的Python TensorFlow库,但很可能许多应用程序很快会遇到部署包大小和/或执行时间的限制,或者需要其他计算选项。

本文将逐步介绍如何使数据管理和预测保持无服务器状态,但将训练工作加载到临时EC2实例。这种实例创建模式将基于为在云中运行具有成本效益的超参数优化而开发的一种模式。

将预测功能保留在Lambda中意味着由于加载TensorFlow而仍然可能存在大小限制。为了减轻这种情况,所有Lambda函数都将为Node.js编写,这也将允许使用TensorFlow.js而不是标准的Python库。

https://www.tensorflow.org/js

TensorFlow.js提供浏览器版本和Node版本,后者包含C ++绑定以提高性能。Node版本似乎是显而易见的选择,但是它可以解压缩为690MB,这使其立即不适合Lambda。鉴于我们不会在Lambda函数中进行训练,因此性能下降可以接受预测,因此将使用解压缩为55MB的浏览器版本。

对于基础的机器学习模型,将尝试基于以下输入参数来预测一个人的舒适度:

  • 温度(F)
  • 相对湿度 (%)
  • 衣物绝缘(以“ clo”为单位)
  • 风速(m / s)

实际模型将使用通过TensorFlow的Keras API构建的简单(非优化)神经网络。

对于数据存储,我们将在DynamoDB中创建两个表:

  • data —将保留带标签的输入数据进行训练
  • model —存储训练工作中的元数据和指标

环境设定

初始化

由于项目将与Node Lambda文件和Python EC2文件混合在一起,因此,将在文件夹结构中将它们分开,如下所示。还将利用Serverless框架,该框架将保留在顶层,而Node和Python部分将在各自的文件夹中初始化。

代码语言:javascript
复制
├── LambdaAutoTraining
│   ├── js
│   │   ├── ...
│   ├── py
│   │   ├── ...

首先,请安装Serverless并使用Node模板初始化一个新项目。应显示样板处理程序(handler.js)和配置文件(serverless.yml)。

代码语言:javascript
复制
$ npm install -g serverless
$ mkdir -p LambdaAutoTraining/{js,py}
$ cd LambdaAutoTraining
$ serverless create --template aws-nodejs

节点设置

导航到该js文件夹,初始化一个新的Node项目,然后安装Tensorflow.js(仅浏览器版本!)。

代码语言:javascript
复制
$ cd js
$ npm init
...follow prompts
$ npm install @tensorflow/tfjs

接下来,使用体系结构图作为指南,创建必要的JavaScript文件,这些文件将映射到最终的Lambda函数。

代码语言:javascript
复制
$ touch test.js upload.js train.js infer.js s3proxy.js

最后,将样板代码从复制handler.js到每个文件中,然后删除handler.js。

Python设置

导航到该py文件夹并创建一个新的虚拟环境。为了创建训练方案,将使用Jupyter笔记本,并且还将需要该tensorflowjs模块,以便可以将保存的模型转换为TensorFlow.js可以理解的格式。

代码语言:javascript
复制
$ cd ../py
$ pyenv virtualenv 3.7.3 autotraining
$ pyenv activate autotraining
$ pip install tensorflow tensorflowjs jupyter
$ pip freeze > requirements.txt

仅需要在本节中创建Jupyter笔记本文件和Dockerfile。Python文件将在Docker构建过程中创建。

代码语言:javascript
复制
$ touch train.ipynb Dockerfile

项目结构现在应如下所示:

代码语言:javascript
复制
├── LambdaAutoTraining
│   ├── serverless.yml
│   ├── js
│   │   ├── test.js
│   │   ├── upload.js
│   │   ├── train.js
│   │   ├── infer.js
│   │   ├── s3proxy.js
│   │   ├── package.json
│   │   ├── package_lock.json
|   │   ├── node_modules
|   │   │   ├── ...
│   ├── py
│   │   ├── requirements.txt
│   │   ├── train.ipynb
│   │   ├── Dockerfile

无服务器设置

该serverless.yml文件是项目的主要配置。首先删除文件中的所有样板文本(如果需要,可以稍后参考文档中的所有各种选项),然后开始构建提供程序部分。

与大多数AWSless Serverless示例的主要区别在于,将定义自己的IAM角色。通常role,该部分将替换为iamRoleStatements允许无服务器与其自己的整体IAM角色合并的自定义策略的部分。但是,需要将EC2包括为受信任的实体,而不能作为的一部分使用iamRoleStatements。稍后将在资源部分中对此进行构建。

环境部分使可以访问Lambda函数中与部署相关的变量。IAM_ROLE将需要创建EC2实例策略,并且API_URL两者都将使用它test.js并向infer.js的API Gateway端点进行调用。

代码语言:javascript
复制
service: lambdaautotraining
 
provider:
  name: aws
  runtime: nodejs10.x
 
  stage: dev
  region: us-east-1
  
  role: IamRole
 
  environment:
    SERVICE: ${self:service}
    REGION: ${self:provider.region}
    BUCKET: ${self:service}-${opt:stage, self:provider.stage}
    STAGE: ${opt:stage, self:provider.stage}
    DYNAMODB_TABLE_MODELS: ${self:service}-models-${opt:stage, self:provider.stage}
    DYNAMODB_TABLE_DATA: ${self:service}-data-${opt:stage, self:provider.stage}
    FUNCTION_PREFIX: ${self:service}-${opt:stage, self:provider.stage}-
    IAM_ROLE: ${self:service}-${opt:stage, self:provider.stage}-${self:provider.region}-managedrole
    API_URL:
      Fn::Join:
        - ""
        - - "https://"
          - Ref: "ApiGatewayRestApi"
          - ".execute-api.${self:provider.region}.amazonaws.com/${self:provider.stage}"
 
package:
  exclude:
    - py/**
    
# ...

接下来,使用图和创建的文件作为指南来定义每个功能。为简单起见,每个处理程序函数名称和API端点将与文件名相同。

upload,infer和s3proxy将通过API网关调用,因此将发生http事件。因为s3proxy将使用路径参数来定义所请求key的文件,并将其作为S3存储桶中的文件夹。

对于该train功能,将使用DynamoDB流触发器,该触发器将包含在资源部分中。当至少有一个新事件并且满足以下任一限制时,将触发此事件:

  • batchSize -创建的最大项目数
  • batchWindow —创建第一个项目后的最长时间

由于train将主要负责启动EC2实例,因此还将定义一些其他特定的环境变量。在此示例中,Docker映像将存储在AWS Docker注册表(ECR)中,但是也可以使用其他映像。

  • AMI_ID —在本示例中,我们将使用ami-0f812849f5bc97db5,因为它是为Docker预先构建的
  • KEY_NAME—这是SSH访问实例所需的pem文件的名称;确保您有权访问私钥!
  • INSTANCE_TYPE —有效值为该图像可用的EC2风格
  • SPOT_DURATION —竞价型实例被中断之前的最短时间(分钟)
  • VALID_HRS —如果未满足,现货请求将持续的最长时间
  • ECR_ID —应与您的AWS账户ID相同
  • ECR_REPO — ECR存储库和项目的名称

最后,test将仅用于手动触发,因此没有关联的事件。

代码语言:javascript
复制
# ...
 
functions:
  upload:
    handler: js/upload.upload
    events:
      - http:
          path: upload
          method: post
        
  train:
    handler: js/train.train
    environment:
      AMI_ID: ami-0f812849f5bc97db5
      KEY_NAME: ec2testing
      INSTANCE_TYPE: t2.micro
      SPOT_DURATION: 60
      VALID_HRS: 4
      ECR_ID: 530583866435
      ECR_REPO: lambda-auto-training/lambda-auto-training-${opt:stage, self:provider.stage}
    events:
      - stream:
          type: dynamodb
          arn:
            Fn::GetAtt: [DataDynamoDbTable, StreamArn]
          batchSize: 100
          batchWindow: 300
        
  infer:
    handler: js/infer.infer
    events:
      - http:
          path: infer
          method: post
          
  test:
    handler: js/test.test  
    
  s3proxy:
    handler: js/s3proxy.s3proxy
    events:
      - http:
          path: s3proxy/{key}/{filename}
          method: get
          
# ...  

接下来,创建S3存储桶和两个DynamoDB表(在此阶段配置的吞吐量有限)。请注意,该data表还包含StreamSpecification将用于触发train功能的。

代码语言:javascript
复制
# ...
 
resources:
 Resources:
    Bucket:
      Type: AWS::S3::Bucket
      Properties:
        BucketName: ${self:provider.environment.BUCKET}
    ModelsDynamoDbTable:
      Type: 'AWS::DynamoDB::Table'
      DeletionPolicy: Retain
      Properties:
        AttributeDefinitions:
          -
            AttributeName: created
            AttributeType: N
        KeySchema:
          -
            AttributeName: created
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        TableName: ${self:provider.environment.DYNAMODB_TABLE_MODELS}
    DataDynamoDbTable:
      Type: 'AWS::DynamoDB::Table'
      DeletionPolicy: Retain
      Properties:
        AttributeDefinitions:
          -
            AttributeName: created
            AttributeType: N
        KeySchema:
          -
            AttributeName: created
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        TableName: ${self:provider.environment.DYNAMODB_TABLE_DATA}
        StreamSpecification:
          StreamViewType: NEW_AND_OLD_IMAGES
          
# ...  

创建的最终资源是自定义IAM角色,该功能将由所有功能使用,并且无服务器文档提供了一个很好的起点模板。为了将角色从Lambda转移到EC2,需要做两件事:

https://serverless.com/framework/docs/providers/aws/guide/iam#one-custom-iam-role-for-all-functions

  • 添加ec2.amazonaws.com到AssumeRolePolicyDocument部分
  • iam:PassRole在该Policies部分添加允许操作

在本Policies节中,将首先复制默认的无服务器策略以进行日志记录和S3部署存储桶(通常会自动创建这些策略)。接下来,将为之前定义的S3存储桶和DynamoDB表添加自定义语句。请注意,在创建自定义策略时,不会自动创建DynamoDB流策略,因此需要显式定义它。

此外,将添加创建EC2实例所需的策略:

  • EC2 —创建并运行实例。
  • CloudWatch —创建,描述和启用警报,以便可以在训练完成后自动终止实例。
  • ECR —允许提取Docker映像(仅EC2会使用,而不是Lambda函数使用)。
  • IAM —获取,创建角色并将其添加到实例配置文件。从控制台启动EC2实例并选择IAM角色时,会自动创建此配置文件,但是需要在功能内手动执行此操作。

安全说明:在部署到生产环境之前,应将这些策略的范围缩小到仅所需的资源

代码语言:javascript
复制
# ...
 
    IamRole:
      Type: AWS::IAM::Role
      Properties:
        Path: /
        RoleName: ${self:service}-${opt:stage, self:provider.stage}-${self:provider.region}-managedrole
        AssumeRolePolicyDocument:
          Version: '2012-10-17'
          Statement:
            - Effect: Allow
              Principal:
                Service:
                  - lambda.amazonaws.com
                  - ec2.amazonaws.com # added for transfer in train.js
              Action: sts:AssumeRole
        Policies:
          - PolicyName: ${self:service}-${opt:stage, self:provider.stage}-${self:provider.region}-managedpolicy
            PolicyDocument:
              Version: '2012-10-17'
              Statement:
                # Begin default Serverless policies
                - Effect: Allow
                  Action:
                    - logs:CreateLogGroup
                    - logs:CreateLogStream
                    - logs:PutLogEvents
                  Resource:
                    - 'Fn::Join':
                      - ':'
                      -
                        - 'arn:aws:logs'
                        - Ref: 'AWS::Region'
                        - Ref: 'AWS::AccountId'
                        - 'log-group:/aws/lambda/*:*:*'
                -  Effect: "Allow"
                   Action:
                     - "s3:PutObject"
                   Resource:
                     Fn::Join:
                       - ""
                       - - "arn:aws:s3:::"
                         - "Ref" : "ServerlessDeploymentBucket"
                # Begin custom policies
                - Effect: Allow
                  Action:
                    - s3:*
                  Resource:
                   Fn::Join:
                     - ""
                     - - "arn:aws:s3:::"
                       - ${self:provider.environment.BUCKET}
                       - "/*"
                - Effect: Allow
                  Action:
                    - dynamodb:Query
                    - dynamodb:Scan
                    - dynamodb:GetItem
                    - dynamodb:PutItem
                    - dynamodb:UpdateItem
                    - dynamodb:DeleteItem
                    - dynamodb:GetRecords
                    - dynamodb:GetShardIterator
                    - dynamodb:DescribeStream
                    - dynamodb:ListStreams
                    - dynamodb:BatchWriteItem
                  Resource:
                    - "arn:aws:dynamodb:${opt:region, self:provider.region}:*:table/${self:provider.environment.DYNAMODB_TABLE_MODELS}"
                    - "arn:aws:dynamodb:${opt:region, self:provider.region}:*:table/${self:provider.environment.DYNAMODB_TABLE_DATA}"
                    - { Fn::GetAtt: [ DataDynamoDbTable, Arn ] }
                # For custom role stream policy must be explicitly defined
                - Effect: Allow
                  Action:
                    - dynamodb:GetRecords
                    - dynamodb:GetShardIterator
                    - dynamodb:DescribeStream
                    - dynamodb:ListStreams
                  Resource:
                    - "arn:aws:dynamodb:${opt:region, self:provider.region}:*:table/${self:provider.environment.DYNAMODB_TABLE_DATA}/stream/*"
                - Effect: Allow
                  Action:
                    - ec2:RunInstances
                    - ec2:TerminateInstances
                    - cloudwatch:PutMetricAlarm
                    - cloudwatch:DescribeAlarms
                    - cloudwatch:EnableAlarmActions
                  Resource: "*"   
                # Allow for transferring role to EC2
                - Effect: Allow
                  Action:
                    - iam:PassRole
                    - iam:GetInstanceProfile
                    - iam:CreateInstanceProfile
                    - iam:AddRoleToInstanceProfile
                  Resource: "*"
                # Used only by EC2 when role is transfered  
                - Effect: Allow
                  Action:
                    - ecr:GetAuthorizationToken
                    - ecr:GetDownloadUrlForLayer
                    - ecr:BatchGetImage
                    - ecr:BatchCheckLayerAvailability
                  Resource:
                    - "*"

由于已经为每个功能添加了样板代码,因此现在可以部署并测试所有配置是否正确。

代码语言:javascript
复制
$ serverless deploy --stage dev
...
$ curl -X POST "https://<api_id>.execute-api.<region>.amazonaws.com/dev/upload"
Go Serverless v1.0! Your function executed successfully!

现在准备构建该应用程序!

Lambda:upload.js

该upload函数将新标记数据的数组作为输入,并将其存储在DynamoDB表中。然后,此更新将启动流触发器以启动该train功能。

在upload.js第一个导入和设置的AWS SDK。由于此功能是从HTTP事件触发的,因此将读取该body字段,然后构造一个代表单个DynamoDB插入项的对象数组。请注意,即使字段具有不同的类型(例如,数字和字符串分别为“ N”或“ S”),实际值也需要作为字符串传递。

代码语言:javascript
复制
'use strict';
 
// Load the SDK for JavaScript
const AWS = require('aws-sdk');
AWS.config.update({region: process.env.REGION});
const ddb = new AWS.DynamoDB({apiVersion: '2012-08-10'});
 
module.exports.upload = (event, context, callback) => {
  var data = event.body
  if (typeof data == 'string') {
    data = JSON.parse(data)
  }
  
  // construct put request items
  var requestItems = [];
  data.forEach(item => {
    requestItems.push({
      PutRequest: {
        Item: {
          created: { "N": item.created.toString() },
          temp: { "N": item.temp.toString() },
          rh: { "N": item.rh.toString() },
          wind: { "N": item.wind.toString() },
          clo: { "N": item.clo.toString() },
          label: { "S": item.label.toString() },
          score: { "N": item.score.toString() },
        }
      }
    });
  });
  
  if (requestItems.length == 0) {
    callback(null, {
      statusCode: 200,
      headers: {'Content-Type': 'text/plain'},
      body: 'No records created since request body length was zero',
    });
    return;
  }
  
  // ....

如果有新的项目来写,将建立一个新的对象,然后使用batchWriteItem从DynamoDB AWS SDK写的新项目。batchWriteItem比一系列putItem请求更有效,而且也是原子的。

https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/dynamodb-example-table-read-write-batch.html

代码语言:javascript
复制
// ...
 
  var params = {
    RequestItems: {
      [process.env.DYNAMODB_TABLE_DATA]: requestItems
    }
  };
  
  console.log('params>>>', params);
 
  ddb.batchWriteItem(params, function(err, data) {
    if (err) {
      console.log("Error", err);
      callback(Error(err), {
        statusCode: 500,
        headers: {'Content-Type': 'text/plain'},
        body: 'Error encountered during record creation',
      });
    } else {
      console.log("Success", data);
      callback(null, {
        statusCode: 200,
        headers: {'Content-Type': 'text/plain'},
        body: requestItems.length.toString() + ' records successfully created',
      });
    }
  });
};

现在已经构建了upload功能,还可以test.js扩展生成随机数据来测试工作流程并填充数据库。有关详细信息,请参见Github文件。

https://github.com/mikepm35/LambdaAutoTraining/blob/master/js/test.js

重新部署到该dev阶段并测试端点。此时,开始用数据填充DynamoDB非常有价值,可以通过手动调用该test.js函数来完成。

代码语言:javascript
复制
$ severless deploy --stage dev
...
$ curl -X POST "https://<api_id>.execute-api.<region>.amazonaws.com/dev/upload" -d '[{"created": 1570323309012, "temp": 75, "rh": 60, "wind": 0.6, "clo": 1.0, "label": "ok", "score": -1}]'
1 records created successfully

尽管该train.js函数尚未构建,但一旦达到批处理窗口或批处理大小,仍应看到它已被调用。

EC2:train.py

完成上传新数据的功能后,现在将重点转移到Python训练部分。将焦点转移到此处而不是完成JavaScript Lambda函数的动机是,train如果完成了EC2 / ECR集成,则可以更轻松地验证功能,否则将无法验证启动脚本是否正常运行。

要开始使用TensorFlow模型,请打开Jupyter Notebook(虚拟环境应该已经激活)。

代码语言:javascript
复制
$ cd py
$ jupyter notebook

打开train.ipynb,先前已将其创建为空文件。希望将关键字段作为环境参数传递给Docker容器,但是为了便于测试,将提供这些值。接下来,创建代表两个DynamoDB表的变量。

对于输入数据,将对DynamoDB数据表执行扫描。在LastEvaluatedKey将存在如果结果被分页,当响应是大于1MB恰好。

DynamoDB返回一个Decimal数据类型,因此将遍历数据集并转换为浮点以及对标签数据进行一次热编码。最后,此列表将转换为numpy数组,以输入到TensorFlow模型中。

为了创建模型,将使用TensorFlow的Keras API,更具体地说是使用顺序模型,该模型允许构建神经网络的各个层。本文的重点不是超参数优化,因此将使用非常简单的配置。重要的是要注意,必须定义输入形状,以便以后导入TensorFlow.js。

模型完成后,将使用tfjs模块中的转换器将其直接保存为TensorFlow.js可以导入的形式。然后将这些文件上传到S3并以当前纪元为键将其上传到新文件夹中。还将维护“最新”文件夹,以定义客户端应使用哪种模型进行预测。最后,每个模型拟合的结果将存储model在DynamoDB 中的表中。

由于data应该填充该表,因此现在可以在本地运行此笔记本并验证功能。

在模型开发完成之后,现在将开始准备Docker映像,首先是Dockerfile,该文件提供了构建映像的说明。

打开Dockerfile进行编辑,并如下所示进行更新,其用途如下:

  • 从标准的Python 3.7基本映像开始
  • 创建一个新用户 lambdaautotraining
  • 在Jupyter笔记本和需求文件中复制
  • 从需求文件安装Python库
  • 将Jupyter Notebook转换为标准Python文件并在图像启动时运行
代码语言:javascript
复制
FROM python:3.7
RUN echo $(python3 --version)
RUN useradd -ms /bin/bash lambdaautotraining
WORKDIR /home/lambdaautotraining
RUN apt-get update -y
COPY train.ipynb requirements.txt ./
RUN pip install -r requirements.txt
RUN chown -R lambdaautotraining:lambdaautotraining ./
USER lambdaautotraining
RUN jupyter nbconvert --to script train.ipynb
CMD ["python3","-u","train.py"]

接下来,在本地构建Docker容器,用ECR URI标记,登录到ECR,然后推送到存储库。

代码语言:javascript
复制
$ docker build -t lambda-auto-training-dev .
...
$ docker tag \
  lambda-auto-training-dev:latest \
  <ecr_id>.dkr.ecr.<region>.amazonaws.com/lambda-auto-training/lambda-auto-training-dev:latest
$ $(aws ecr get-login --no-include-email --region <region>)
...
$ docker push \
  <ecr_id>.dkr.ecr.<region>.amazonaws.com/lambda-auto-training/lambda-auto-training-dev:latest

可以手动启动EC2实例并执行命令以运行该映像,但是将创建触发Lambda函数并对其进行测试。

Lambda:train.js

trainLambda函数的主要目的是对新一批带标签的数据做出反应,然后启动一个新的EC2实例,以完全执行训练工作流程。与upload使用回调样式处理程序的函数不同,这里将使用async / await模式。

在此函数中定义的第一个变量是初始化脚本,该脚本将传递到EC2实例以进行启动。作为单独的shell脚本值得测试,但为简单起见,此处仅以字符串形式显示。该脚本的主要职责如下:

  • 下载并安装AWS CLI
  • 登录到ECR
  • 下拉所需的Docker映像
  • 运行Docker映像

请注意,该run命令具有一系列环境属性,这些属性是通过replace语句定义的。这些将在的训练Python脚本中用于与DynamoDB和S3进行交互。

最后,该字符串需要根据EC2要求进行base64编码。

代码语言:javascript
复制
'use strict';
 
const child_process = require("child_process");
 
const AWS = require('aws-sdk');
AWS.config.update({region: process.env.REGION});
const iam = new AWS.IAM();
const ec2 = new AWS.EC2({apiVersion: '2016-11-15'});
const cw = new AWS.CloudWatch({apiVersion: '2010-08-01'});
 
module.exports.train = async event => {  
  // Create init script
  var userData = `#!/bin/bash
  sudo yum install -y unzip
  curl "https://s3.amazonaws.com/aws-cli/awscli-bundle.zip" -o "awscli-bundle.zip"
  unzip awscli-bundle.zip
  sudo ./awscli-bundle/install -i /usr/local/aws -b /usr/local/bin/aws
  $(aws ecr get-login --no-include-email --region <region>)
  docker pull <ecr_id>.dkr.ecr.<region>.amazonaws.com/<repo>:latest
  docker run --name keras-remote-training \
    -e "TABLE_MODELS=<table_models>" -e "TABLE_DATA=<table_data>" \
    -e "BUCKET=<bucket>" -e "REGION=<region>"\
    -d --rm <ecr_id>.dkr.ecr.<region>.amazonaws.com/<repo>:latest
  `;
  
  userData = userData.replace(/<region>/g, process.env.REGION);
  userData = userData.replace(/<ecr_id>/g, process.env.ECR_ID);
  userData = userData.replace(/<repo>/g, process.env.ECR_REPO);
  userData = userData.replace(/<table_models>/g, process.env.DYNAMODB_TABLE_MODELS);
  userData = userData.replace(/<table_data>/g, process.env.DYNAMODB_TABLE_DATA);
  userData = userData.replace(/<bucket>/g, process.env.BUCKET);
  
  console.log('Final userData>>> ', userData);
  
  var userDataBuff = new Buffer(userData);
  var userDataBase64 = userDataBuff.toString('base64');
  console.log('Base64 encoded userData>>>', userDataBase64);
  
  // ...

接下来,检索实例配置文件,该配置文件定义了EC2实例将使用的IAM角色。每个需要阻止的调用都使用带有await关键字的promise表单。

代码语言:javascript
复制
 // ...
 
  // Get instance profile
  var profileParams = {
    InstanceProfileName: process.env.SERVICE+'-'+process.env.STAGE,
  }
  
  var profileData = null;
  try {
    profileData = await iam.getInstanceProfile(profileParams).promise();
  } catch (err) {
    if (err.statusCode == 404) {
      console.log('Instance profile does not exist, creating');
      profileData = await iam.createInstanceProfile(profileParams).promise();
      child_process.execSync("sleep 10"); // alllows new profile to propogate
      profileParams.RoleName = process.env.IAM_ROLE;
      var roleData = await iam.addRoleToInstanceProfile(profileParams).promise();
      console.log('Added role to instance profile');
    } else {
      console.error('Unexpected error getting instance profile: ', err);
      return {
        statusCode: 500,
        body: JSON.stringify({error: "Error getting instance profile"}),
      };
    }
  }      
  
  // ...

有了实例配置文件,将为竞价型实例定义完整的EC2参数集。另一种选择是分别创建一个模板并直接启动它。还将在关闭时终止实例,这里的另一项优化是根据需要停止/启动持久实例。

代码语言:javascript
复制
 // ...
  
  // Set EC2 instance parameters
  var instanceParams = {
     ImageId: process.env.AMI_ID,
     InstanceType: process.env.INSTANCE_TYPE,
     KeyName: process.env.KEY_NAME,
     MinCount: 1,
     MaxCount: 1,
     UserData: userDataBase64,
     InstanceInitiatedShutdownBehavior: 'terminate',
     InstanceMarketOptions: {
       MarketType: 'spot',
       SpotOptions: {
         BlockDurationMinutes: process.env.SPOT_DURATION,
         InstanceInterruptionBehavior: 'terminate',
         SpotInstanceType: 'one-time',
       }
     },
     IamInstanceProfile: {
       Arn: profileData.InstanceProfile.Arn,
     },
  };
  
  // ...

现在,准备开始创建EC2。成功后,将创建并启用警报,当CPU降至某个阈值以下时,该警报将自动终止实例,将其用作完成训练的代理。

代码语言:javascript
复制
 // ...
  
  // Create EC2 instance
  try {
    var instanceData = await ec2.runInstances(instanceParams).promise();
    var instanceId = instanceData.Instances[0].InstanceId;
    console.log("Created instance", instanceId);
    
    // Create alarm to auto-terminate    
    var alarmParams = {
      AlarmName: process.env.SERVICE+'_CPU_Utilization_'+instanceId,
      ComparisonOperator: 'LessThanThreshold',
      EvaluationPeriods: 2,
      MetricName: 'CPUUtilization',
      Namespace: 'AWS/EC2',
      Period: 60*5,
      Statistic: 'Average',
      Threshold: 5.0,
      ActionsEnabled: true,
      AlarmActions: ['arn:aws:automate:'+process.env.REGION+':ec2:terminate'],
      AlarmDescription: 'Termination alarm for CPU below 5%',
      Dimensions: [
        {
          Name: 'InstanceId',
          Value: instanceId
        },
      ],
      Unit: 'Percent'
    };
    
    var alarmData = await cw.putMetricAlarm(alarmParams).promise();
    console.log('Alarm created: ', alarmData);
    
    // Enable action on alarm
    var paramsEnableAlarmAction = {
      AlarmNames: [alarmParams.AlarmName]
    };
    
    var actionData = await cw.enableAlarmActions(paramsEnableAlarmAction).promise();
    console.log("Alarm action enabled", actionData);
    
    return {
      statusCode: 200,
      body: JSON.stringify({result: "Instance created"}),
    };  
    
  } catch (err) {
    console.error('Error creating instance: ', err);
    return {
      statusCode: 500,
      body: JSON.stringify({error: "Error creating instance"}),
    };  
  }
};

现在,可以使用新功能更新开发环境。

代码语言:javascript
复制
$ cd ../js
$ serverless deploy --stage dev

在已经确认可以触发train.js工作后,将使用控制台测试训练工作流程。在AWS中,打开Lambda,DynamoDB,S3和EC2的服务页面并执行以下操作:

  • Lambda:输入为空时触发火车功能
  • EC2:验证实例是否创建了适当的警报
  • DynamoDB:验证模型信息已更新
  • S3:验证模型文件已上传
  • EC2:约10分钟后,确认实例已终止

Lambda:infer.js

完成完整的训练工作流程后,现在就可以构建预测/推断部分。主要目的infer是下载模型,加载到TensorFlow.js中,然后根据HTTP触发器提供给它的一组输入进行预测。该函数期望输入为对象数组,其键代表所需的模型输入字段。

TensorFlow.js的浏览器版本使用fetch的不是Node.js中的标准版本。为了解决这个问题,将安装node-fetch,并在fetch全局范围内使用它。

代码语言:javascript
复制
$ npm install node-fetch

接下来,必须下载模型。将再次需要解决以下事实:正在使用浏览器版本,该版本不希望访问标准本地文件系统。可以从tfjs-node项目中提取必要的模块,但是在本示例中,将利用中的直接HTTP下载选项loadLayersModel。

但是,由于S3存储桶尚未对外开放,因此需要确定如何允许这种访问。对于使用签名URL的 HTTP访问S3 是一个合理的选择,但是在下载步骤TensorFlow实际上在做两件事:

https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#getSignedUrl-property

  1. 下载model.json-可以在此处传递签名的URL
  2. 使用url根下载模型拓扑-步骤1中签名的url将不再起作用!

要解决此问题,将使用单独的代理,该代理将接收每个请求并将其重定向到适当的签名url。更新s3proxy以支持此操作,如下所示:

代码语言:javascript
复制
'use strict';
 
const AWS = require('aws-sdk');
AWS.config.update({region: process.env.REGION});
const s3 = new AWS.S3();
  
module.exports.s3proxy = async event => {    
  var filename = 'models/latest/model.json';
  console.log('Final filename: ', filename);
 
  var signedUrl = s3.getSignedUrl('getObject', {
      Bucket: process.env.BUCKET,
      Key: filename,
      Expires: 100 // seconds
  });
  
  console.log('Signed url: ', signedUrl);
  
  return {
    statusCode: 302,
    headers: {
      Location: signedUrl
    }
  };
 
};

回到infer函数,加载模型,将输入转换为2D张量并运行预测。arraySync会将结果转换为标准浮点数,并将每组输入转换为跨输出维度的一组预测。通过找到最大值,此预测将转换为简单的标签映射,然后在新的JSON对象中返回。

代码语言:javascript
复制
'use strict';
 
const tf = require('@tensorflow/tfjs');
 
// See https://github.com/tensorflow/tfjs/issues/2029
const nodeFetch = require('node-fetch');
global.fetch = nodeFetch;
 
const AWS = require('aws-sdk');
AWS.config.update({region: process.env.REGION});
const ddb = new AWS.DynamoDB({apiVersion: '2012-08-10'});
 
module.exports.infer = async event => {  
  // Expecting to be array of json objects with keys matching inputs
  var data = event.body
  if (typeof data == 'string') {
    data = JSON.parse(data)
  }
  
  var input = [];
  for (var entry in data) {
    input.push([entry.temp, entry.clo, entry.rh, entry.wind]);
  }
  
  // Download and load the model
  var url = process.env.API_URL + '/s3proxy/latest/model.json';
  console.log('s3proxy url: ', url);
  const model = await tf.loadLayersModel(url);
 
  // Run inference with predict()
  var predictResult = model.predict(tf.tensor2d(input));
  var resultArray = predictResult.arraySync();
  
  // Construct output
  var output = [];
  for (var entry in resultArray) {
    var maxIndex = entry.indexOf(Math.max(...entry));
    
    var simpleResult = null;
    switch (maxIndex) {
      case 0:
        simpleResult = 'cold';
        break;
      case 1:
        simpleResult = 'ok';
        break;
      case 2:
        simpleResult = 'warm';
        break;
    }
    
    output.push({
      simpleResult: simpleResult,
      rawResult: entry,
    });
  }
  
  return {
    statusCode: 200,
    body: JSON.stringify(output),
  };
 
};

测试整个工作流程

如果创建了该test功能,则可以设置cron作业以在定义的时间间隔执行,以模拟实际流量。为此,需要将CloudWatch事件触发器(默认禁用)添加到我们的serverless.yml配置中:

代码语言:javascript
复制
test:
    handler: js/test.test  
    events:
      - schedule:
          rate: rate(2 minutes)
          enabled: false

手动启用触发器可能会有些混乱,因为Lambda UI将其显示为“已启用”,但是实际上您需要转到CloudWatch启用底层事件:

问题在于,AWS::Events::RuleLambda UI中既有一个(可以启用/禁用),也有一个触发器(可以启用/禁用)。Lambda UI显示触发状态,该状态已启用。但是,实际上无法通过CloudFormation来解决这一问题。该AWS::Events::Rule设置为禁用,这是设定CloudFormation。如果触发器或规则被禁用,它将不会触发您的Lambda。

对于预测方面,可以像以前一样手动进行测试,或者扩展测试功能策略以包括推断。

准备就绪后,现在可以部署到“生产”阶段。对于Docker映像,将向现有映像添加一个新标签,并将其推送到生产存储库中。

代码语言:javascript
复制
$ serverless deploy --stage prod
...
$ docker tag \
  lambda-auto-training-dev:latest \
  <ecr_id>.dkr.ecr.<region>.amazonaws.com/lambda-auto-training/lambda-auto-training-prod:latest
...
$ docker push \
  <ecr_id>.dkr.ecr.<region>.amazonaws.com/lambda-auto-training/lambda-auto-training-prod:latest

最后的想法

鉴于这是一个原型,在部署到实际生产环境之前,应考虑许多方面:

  • 持久性API端点的域集成(请参阅serverless-domain-manager插件)。
  • HTTP事件输入应经过验证,并包括错误处理。
  • 可以将暖机功能添加到面向客户端的端点,以限制冷启动时较长的调用时间。
  • IAM资源权限应加强。将这种环境封装在VPC中将是一个不错的选择,并且还提供了代理的替代方法,以允许HTTP访问S3。
  • DynamoDB流触发器是比较初级的,并且在大容量环境中可能最终变得过于激进。更为健壮的解决方案可能是将新事件附加到文件中并分别对新事件进行计数,这也可以减轻每次训练运行时扫描整个表的负担。
  • 如果EC2实例在每次运行后终止,最终将需要清除未使用的警报。如果使用了停止/启动一个实例的另一种方式,则警报也可以重新使用。
  • 为了保护生产,应在训练工作中应用阈值,以免引入性能不佳的模型进行预测。

可以在GitHub上查看所有代码:

https://github.com/mikepm35/LambdaAutoTraining

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 相约机器人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器镜像服务
容器镜像服务(Tencent Container Registry,TCR)为您提供安全独享、高性能的容器镜像托管分发服务。您可同时在全球多个地域创建独享实例,以实现容器镜像的就近拉取,降低拉取时间,节约带宽成本。TCR 提供细颗粒度的权限管理及访问控制,保障您的数据安全。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档