前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kubeflow实践笔记

Kubeflow实践笔记

作者头像
operator开发工程师
发布2023-11-16 19:55:05
4230
发布2023-11-16 19:55:05
举报
文章被收录于专栏:云原生民工云原生民工

基于 Kubernetes 的云原生 AI 平台建设

提高算力资源利用

  1. GPU 虚拟化

GPUManager 基于 GPU 驱动封装实现,用户需要对驱动的某些关键接口(如显存分配、cuda thread 创建等)进行封装劫持,在劫持过程中限制用户进程对计算资源的使用,整体方案较为轻量化、性能损耗小,自身只有 5% 的性能损耗,支持同一张卡上容器间 GPU 和显存使用隔离,保证了编码这种算力利用率不高的场景开发者可以共享 GPU,同时在同一块调试时资源不会被抢占。

  1. 训练集群算力调度

在 Kubernetes 里面使用 Job 来创建训练任务,只需要指定需要使用的GPU资源,结合消息队列,训练集群算力资源利用率可以达到满载。

  1. 资源监控

资源监控对集群编码、训练优化有关键指导作用,可以限制每个项目 GPU 总的使用量和每个用户GPU 资源分配。

kubeflow介绍

Kubeflow 是 google 开发的包含了机器学习模型开发生命周期的开源平台。 Kubeflow 由一组工具组成,这些工具解决了机器学习生命周期中的每个阶段,例如:数据探索、特征工程、特征转换、模型实验、模型训练、模型评估、模型调整、模型服务和 模型版本控制。 kubeflow 的主要属性是它被设计为在 kubernetes 之上工作,也就是说,kubeflow 利用了 kubernetes 集群提供的好处,例如容器编排和自动扩展。

https://www.kubeflow.org/docs/images/kubeflow-overview-platform-diagram.svg
https://www.kubeflow.org/docs/images/kubeflow-overview-platform-diagram.svg

Kubeflow components in the ML workflow

https://www.kubeflow.org/docs/images/kubeflow-overview-workflow-diagram-2.svg
https://www.kubeflow.org/docs/images/kubeflow-overview-workflow-diagram-2.svg

安装 kubeflow

下载 修改过镜像地址的的代码仓库

1 2 3

git clone https://github.com/zhuyaguang/manifests.git while ! kustomize build example | kubectl apply -f -; do echo "Retrying to apply resources"; sleep 10; done

启动kubeflow界面

1

kubectl port-forward --address 0.0.0.0 -n istio-system svc/istio-ingressgateway 8080:80 &

kubeflow学习指南笔记

本书代码地址

设置镜像仓库

Kaniko配置指南:https://github.com/GoogleContainerTools/kaniko#pushing-to-different-registries

创建一个 kubeflow 项目,手写数字识别

模型查询示例代码: https://github.com/intro-to-ml-with-kubeflow/intro-to-ml-with-kubeflow-examples/blob/master/ch2/query-endpoint.py

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33

import requests import numpy as np from tensorflow.examples.tutorials.mnist import input_data from matplotlib import pyplot as plt def download_mnist(): return input_data.read_data_sets("MNIST_data/", one_hot=True) def gen_image(arr): two_d = (np.reshape(arr, (28, 28)) * 255).astype(np.uint8) plt.imshow(two_d, cmap=plt.cm.gray_r, interpolation='nearest') return plt #end::scriptSetup[] AMBASSADOR_API_IP = "10.53.148.167:30134" #tag::scriptGuts[] mnist = download_mnist() batch_xs, batch_ys = mnist.train.next_batch(1) chosen = 0 gen_image(batch_xs[chosen]).show() data = batch_xs[chosen].reshape((1, 784)) features = ["X" + str(i + 1) for i in range(0, 784)] request = {"data": {"names": features, "ndarray": data.tolist()}} deploymentName = "mnist-classifier" uri = "http://" + AMBASSADOR_API_IP + "/seldon/" + \ deploymentName + "/api/v0.1/predictions" response = requests.post(uri, json=request) #end::scriptGuts[] print(response.status_code)

kubeflow 组件设计

Central Dashboard :主界面

Kubeflow Notebooks:可以安装Jupyter

Kubeflow Pipelines:pipeline

Katib:超参数调优

Training Operators:各种训练模型的 crd controller

Multi-Tenancy :多租户

Pipeline

pipeline本质上是一个容器执行的图,除了指定哪些容器以何种顺序运行之外,它还允许用户向整个pipeline传递参数和在容器之间传递参数。

每一个pipeline包含下面四个必要步骤

1.创建容器 2.创建一个操作 3.对操作进行排序 4.输出为可执行的YAML文件

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17

from kfp import dsl, compiler import kfp as comp @comp.create_component_from_func def echo_op(): print("Hello world") @dsl.pipeline( name='my-first-pipeline', description='A hello world pipeline.' ) def hello_world_pipeline(): echo_task = echo_op() if __name__ == '__main__': compiler.Compiler().compile(hello_world_pipeline, __file__ + '.yaml')

pipeline 基本例子

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68

#!/usr/bin/env python # coding: utf-8 import kfp from kfp import compiler import kfp.dsl as dsl import kfp.notebook import kfp.components as comp #Define a Python function def add(a: float, b: float) -> float: '''Calculates sum of two arguments''' return a + b add_op = comp.func_to_container_op(add) from typing import NamedTuple def my_divmod( dividend: float, divisor: float ) -> NamedTuple('MyDivmodOutput', [('quotient', float), ('remainder', float)]): '''Divides two numbers and calculate the quotient and remainder''' #Imports inside a component function: import numpy as np #This function demonstrates how to use nested functions inside a component function: def divmod_helper(dividend, divisor): return np.divmod(dividend, divisor) (quotient, remainder) = divmod_helper(dividend, divisor) from collections import namedtuple divmod_output = namedtuple('MyDivmodOutput', ['quotient', 'remainder']) return divmod_output(quotient, remainder) divmod_op = comp.func_to_container_op( my_divmod, base_image='tensorflow/tensorflow:1.14.0-py3') @dsl.pipeline( name='Calculation pipeline', description='A toy pipeline that performs arithmetic calculations.') def calc_pipeline( a='a', b='7', c='17', ): #Passing pipeline parameter and a constant value as operation arguments add_task = add_op(a, 4) # Returns a dsl.ContainerOp class instance. #Passing a task output reference as operation arguments #For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax divmod_task = divmod_op(add_task.output, b) #For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax result_task = add_op(divmod_task.outputs['quotient'], c) if __name__ == '__main__': # Compiling the pipeline kfp.compiler.Compiler().compile(calc_pipeline, 'ch04.yaml')

步骤之间存储数据

kubeflow Pipeline 的 volumeOp 允许创建一个自动管理的持久卷。

1

dvop = dsl.volumeOp(name="create_pvc",resource_name="my-pvc-2",size="5Gi",modes=dsl.VOLUME_MODE_RWO)

还可以利用 MinIO 把文件写入容器本地,并在ContainerOp中指定参数

1 2 3 4 5 6 7 8 9 10

fetch = kfp.dsl.ContainerOp( name="download", command=['sh','-c'], arguments=[ 'sleep 1;' 'mkdir -p /tmp/data;' 'wget '+ data_url +' -O /tmp/data/result.csv' ], file_outputs={'downloaded':'/tmp/data'} )

pipeline 之间传递数据例子

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39

from ast import arguments from unicodedata import name from setuptools import Command from kfp import dsl, compiler def gcs_download_op(url): return dsl.ContainerOp( name='GCS - Download', image='google/cloud-sdk:279.0.0', command=['sh', '-c'], arguments=['gsutil cat $0 | tee $1', url, '/tmp/results.txt'], file_outputs={ 'data': '/tmp/results.txt', } ) def echo_op(text): return dsl.ContainerOp( name='echo', image='library/bash:4.4.23', command=['sh', '-c'], arguments=['echo "$0"', text] ) @dsl.pipeline( name='sequential-pipeline', description='A pipeline with two sequential steps.' ) def sequential_pipeline(url='gs://ml-pipeline/sample-data/shakespeare/shakespeare1.txt'): """A pipeline with two sequential steps.""" download_task = gcs_download_op(url) echo_task = echo_op(download_task.output) if __name__ == '__main__': compiler.Compiler().compile(sequential_pipeline, __file__ + '.yaml')

func_to_container

一个函数变成一个container,有很多种方式

1.参数加镜像模式,业务逻辑通过镜像传递进来

1 2 3 4 5 6 7 8 9 10 11 12 13 14

def SendMsg( send_msg: str = 'akash' ): return dsl.ContainerOp( name = 'Print msg', image = 'docker.io/akashdesarda/comp1:latest', #逻辑在这里面 command = ['python', 'msg.py'], arguments=[ '--msg', send_msg ], file_outputs={ 'output': '/output.txt', } )

2.参数加函数模式加基础镜像,业务逻辑直接写在函数里面,通过基础镜像运行 有bug,会去拉busybox镜像,需要修改源代码的基础镜像。

1 2 3 4 5 6 7 8 9 10

def load_data(log_folder:str)->NamedTuple('Outputs', [('start_time_string',str)]): # some code here #逻辑在这里面 load_data_op=func_to_container_op( func=load_data, base_image="mike0355/k8s-facenet-distributed-training:4", )

3.目前最新的版本似乎都提倡LoadFrom File/URL/Text这种形式

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44

import kfp import kfp.components as comp import kfp.dsl as dsl create_step_get_lines = comp.load_component_from_text(""" name: Get Lines description: Gets the specified number of lines from the input file. inputs: - {name: input_1, type: String, description: 'Data for input_1'} - {name: parameter_1, type: Integer, default: '100', description: 'Number of lines to copy'} outputs: - {name: output_1, type: String, description: 'output_1 data.'} implementation: container: image: zhuyaguang/pipeline:v4 command: [ python3, # Path of the program inside the container /pipelines/component/src/v2_2.py, --input1-path, {inputPath: input_1}, --param1, {inputValue: parameter_1}, --output1-path, {outputPath: output_1}, ]""") # Define your pipeline @dsl.pipeline( pipeline_root='', name="example-pipeline", ) def my_pipeline(): get_lines_step = create_step_get_lines( # Input name "Input 1" is converted to pythonic parameter name "input_1" input_1='one\ntwo\nthree\nfour\nfive\nsix\nseven\neight\nnine\nten', parameter_1='5', ) if __name__ == '__main__': # Compiling the pipeline kfp.compiler.Compiler().compile(my_pipeline, 'v2.yaml')

更多的方式例子可参考:标准组件库

Pipeline 高级主题
  1. 复杂条件判断
  2. 定期执行pipeline,使用recurring
数据准备和特征准备

2022数据准备工具列表

元数据

ML Metadata

使用TFjob训练机器学习模型(预测用户购买行为)

用户购买记录数据

Notebook 基础镜像:tensorflow-1.15.2-notebook-cpu:1.0.0

  • 安装MinIO客户端

1 2 3 4

wget http://dl.minio.org.cn/client/mc/release/linux-amd64/mc //该地址已经404了 https://dl.min.io/client/mc/release/linux-amd64/ chmod +x mc ./mc --help

  • 部署MinIO服务

1 2 3 4 5 6 7 8

kubectl port-forward --address 0.0.0.0 -n kubeflow svc/minio-service 9000:9000 & ./mc config host add minio http://10.101.32.13:9000 minio minio123 ./mc mb minio/data ./mc cp recommend_1.csv minio/data/recommender/user.csv ./mc cp trx_data.csv minio/data/recommender/transations.csv

创建notebook,并进行 tensorflow 训练

使用 public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter:v1.5.0 作为base镜像

训练代码地址

部署 tensorflow 作业,使用TFJobs,把训练代码放置容器里面

1 2 3 4 5 6 7 8 9 10 11

FROM tensorflow/tensorflow:1.15.0-py3 RUN pip3 install --upgrade pip RUN pip3 install pandas --upgrade RUN pip3 install keras --upgrade RUN pip3 install minio --upgrade RUN pip3 install kubernetes --upgrade RUN pip3 install kfmd --upgrade RUN mkdir -p /opt/kubeflow COPY Recommender_Kubeflow.py /opt/kubeflow/ ENTRYPOINT ["python3", "/opt/kubeflow/Recommender_Kubeflow.py"]

1

docker build -t kubeflow/recommenderjob:1.0 .

TFJob.yaml

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20

apiVersion: kubeflow.org/v1 kind: TFJob metadata: name: recommenderjob namespace: kubeflow-user-example-com spec: tfReplicaSpecs: Worker: replicas: 1 restartPolicy: Never template: metadata: annotations: sidecar.istio.io/inject: "false" spec: containers: - name: tensorflow image: 10.100.29.62/kubeflow/recommender:1.0 imagePullSecrets: - name: harbor

更多的TFJob 和 PyTorchJob 可以参考文档 来进行更详细的配置和使用GPU、TPU等不同的硬件。

使用PyTorchJob训练机器学习模型 (孙浩的专利检索)
  • 训练代码 train.py

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59

#!/usr/bin/env python # coding: utf-8 # In[ ]: from transformers import ( BertConfig, BertTokenizer, BertForMaskedLM, LineByLineTextDataset, DataCollatorForLanguageModeling, Trainer, TrainingArguments ) import torch import tokenizers import argparse def main(args): tokenizer_kwargs = { "model_max_length": 512 } tokenizer = BertTokenizer.from_pretrained('/home/pipeline-demo/', **tokenizer_kwargs) config_new = BertConfig.from_pretrained(args.config) model = BertForMaskedLM.from_pretrained(args.model, config=config_new) model.resize_token_embeddings(len(tokenizer)) train_dataset = LineByLineTextDataset(tokenizer = tokenizer,file_path = args.file_path, block_size=512) data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=True, mlm_probability=0.15) pretrain_batch_size=16 num_train_epochs=5 training_args = TrainingArguments( output_dir='/home/pipeline-demo/args', overwrite_output_dir=True, num_train_epochs=num_train_epochs, learning_rate=1e-4, weight_decay=0.01, warmup_steps=10000, local_rank = args.local_rank, #dataloader_pin_memory = False, per_device_train_batch_size=pretrain_batch_size, logging_strategy ="epoch",save_strategy = "epoch", save_total_limit = 1) trainer = Trainer( model=model, args=training_args, data_collator=data_collator, train_dataset=train_dataset) trainer.train() trainer.save_model(args.save_dir) if __name__ == "__main__": parser = argparse.ArgumentParser(description="nezha_train") parser.add_argument("--config", type = str, default = "bert-base-uncased", help = "二次训练_nezha") parser.add_argument("--model", type = str, default = "bert-base-uncased", help = "二次训练_nezha") parser.add_argument("--file_path", type = str, default = "/home/pipeline-demo/newfileaa", help = "二次训练_nezha") parser.add_argument("--save_dir", type = str, default = "/home/pipeline-demo", help = "二次训练_nezha") parser.add_argument("--local_rank", type = int, default = -1, help = "For distributed training: local_rank") args = parser.parse_args() main(args)

  • Dockerfile

1 2 3 4 5 6 7 8

FROM python:3.7 RUN python3 -m pip install transformers RUN python3 -m pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple RUN python3 -m pip install tokenizers RUN python3 -m pip install argparse COPY ./vocab.txt /home/pipeline-demo/vocab.txt COPY ./newfileaa /home/pipeline-demo/newfileaa COPY ./train.py /home/pipeline-demo/train.py

1

docker build -f Dockerfile -t 10.100.29.62/kubeflow/train:v2 ./

  • PyTorchJob.yaml

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31

apiVersion: "kubeflow.org/v1" kind: PyTorchJob metadata: name: pytorch-simple namespace: kubeflow spec: pytorchReplicaSpecs: Master: replicas: 1 restartPolicy: OnFailure template: spec: containers: - name: pytorch image: 10.100.29.62/kubeflow/train:v2 imagePullPolicy: Always command: - "python3" - "/home/pipeline-demo/train.py" Worker: replicas: 1 restartPolicy: OnFailure template: spec: containers: - name: pytorch image: 10.100.29.62/kubeflow/train:v2 imagePullPolicy: Always command: - "python3" - "/home/pipeline-demo/train.py"

TF job分布式训练MNIST例子

问题

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-04-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基于 Kubernetes 的云原生 AI 平台建设
    • 提高算力资源利用
      • kubeflow介绍
        • Kubeflow components in the ML workflow
          • 安装 kubeflow
            • kubeflow学习指南笔记
              • 问题
              相关产品与服务
              容器服务
              腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档