使用TPU在PyTorch中实现ResNet50

摘要:
PyTorch通过XLA提供此功能,XLA是一种线性代数编译器,可以针对多种类型的硬件,包括GPU和TPU。PyTorch/XLA环境与Google Cloud TPU集成,以实现更快的执行速度。在本文中,我们将在PyTorch中使用TPU来演示ResNet50(一种深度卷积神经网络)的实现。该模型将在PyTorch/XLA环境中进行训练和测试,以完成CIFAR10数据集的分类任务。您还可以通过打印TPU地址来检查TPU。TPU_Path='rpc://'+os。环境['COLAB_TPU_ADDR']打印在下一步中,我们将安装XLA环境以加快执行过程。

作者|DR. VAIBHAV KUMAR
编译|VK
来源|Analytics In Diamag

PyTorch通过提供大量强大的工具和技术,一直在推动计算机视觉和深度学习领域的发展。

在计算机视觉领域,基于深度学习的执行需要处理大量的图像数据集,因此需要一个加速的环境来加快执行过程以达到可接受的精度水平。

PyTorch通过XLA(加速线性代数)提供了这一特性,XLA是一种线性代数编译器,可以针对多种类型的硬件,包括GPU和TPU。PyTorch/XLA环境与Google云TPU集成,实现了更快的执行速度。

使用TPU在PyTorch中实现ResNet50第1张

在本文中,我们将在PyTorch中使用TPU演示一种深卷积神经网络ResNet50的实现。

该模型将在PyTorch/XLA环境中进行训练和测试,以完成CIFAR10数据集的分类任务。我们还将检查在50个epoch训练所花费的时间。

ResNet50在Pytorch的实现

为了利用TPU的功能,这个实现是在Google Colab中完成的。首先,我们需要从Notebook设置下的硬件加速器中选择TPU。

使用TPU在PyTorch中实现ResNet50第2张

选择TPU后,我们将使用下面的行验证环境代码:

import os
assert os.environ['COLAB_TPU_ADDR']

如果启用了TPU,它将成功执行,否则它将抛出‘KeyError: ‘COLAB_TPU_ADDR’’。你也可以通过打印TPU地址来检查TPU。

TPU_Path = 'grpc://'+os.environ['COLAB_TPU_ADDR']
print('TPU Address:', TPU_Path)

使用TPU在PyTorch中实现ResNet50第3张

在下一步中,我们将安装XLA环境以加快执行过程。我们在上一篇文章中实现了卷积神经网络。

VERSION = "20200516"
!curl https://raw.githubusercontent.com/pytorch/xla/master/contrib/scripts/env-setup.py -o pytorch-xla-env-setup.py
!python pytorch-xla-env-setup.py --version $VERSION

现在,我们将在这里导入所有必需的库。

from matplotlib import pyplot as plt
import numpy as np
import os
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch_xla
import torch_xla.core.xla_model as xm
import torch_xla.debug.metrics as met
import torch_xla.distributed.parallel_loader as pl
import torch_xla.distributed.xla_multiprocessing as xmp
import torch_xla.utils.utils as xu
import torchvision
from torchvision import datasets, transforms
import time
from google.colab.patches import cv2_imshow
import cv2

导入库之后,我们将定义并初始化所需的参数。

# 定义参数
FLAGS = {}
FLAGS['data_dir'] = "/tmp/cifar"
FLAGS['batch_size'] = 128
FLAGS['num_workers'] = 4
FLAGS['learning_rate'] = 0.02
FLAGS['momentum'] = 0.9
FLAGS['num_epochs'] = 50
FLAGS['num_cores'] = 8
FLAGS['log_steps'] = 20
FLAGS['metrics_debug'] = False

在下一步中,我们将定义ResNet50模型。

class BasicBlock(nn.Module):
  expansion = 1

  def __init__(self, in_planes, planes, stride=1):
    super(BasicBlock, self).__init__()
    self.conv1 = nn.Conv2d(
        in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
    self.bn1 = nn.BatchNorm2d(planes)
    self.conv2 = nn.Conv2d(
        planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
    self.bn2 = nn.BatchNorm2d(planes)

    self.shortcut = nn.Sequential()
    if stride != 1 or in_planes != self.expansion * planes:
      self.shortcut = nn.Sequential(
          nn.Conv2d(
              in_planes,
              self.expansion * planes,
              kernel_size=1,
              stride=stride,
              bias=False), nn.BatchNorm2d(self.expansion * planes))

  def forward(self, x):
    out = F.relu(self.bn1(self.conv1(x)))
    out = self.bn2(self.conv2(out))
    out += self.shortcut(x)
    out = F.relu(out)
    return out

class ResNet(nn.Module):

  def __init__(self, block, num_blocks, num_classes=10):
    super(ResNet, self).__init__()
    self.in_planes = 64

    self.conv1 = nn.Conv2d(
        3, 64, kernel_size=3, stride=1, padding=1, bias=False)
    self.bn1 = nn.BatchNorm2d(64)
    self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
    self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
    self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
    self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
    self.linear = nn.Linear(512 * block.expansion, num_classes)

  def _make_layer(self, block, planes, num_blocks, stride):
    strides = [stride] + [1] * (num_blocks - 1)
    layers = []
    for stride in strides:
      layers.append(block(self.in_planes, planes, stride))
      self.in_planes = planes * block.expansion
    return nn.Sequential(*layers)

  def forward(self, x):
    out = F.relu(self.bn1(self.conv1(x)))
    out = self.layer1(out)
    out = self.layer2(out)
    out = self.layer3(out)
    out = self.layer4(out)
    out = F.avg_pool2d(out, 4)
    out = torch.flatten(out, 1)
    out = self.linear(out)
    return F.log_softmax(out, dim=1)

def ResNet50():
  return ResNet(BasicBlock, [3, 4, 6, 4, 3])

下面的代码片段将定义加载CIFAR10数据集、准备训练和测试数据集、训练过程和测试过程的函数。

SERIAL_EXEC = xmp.MpSerialExecutor()
# 只在内存中实例化一次模型权重。
WRAPPED_MODEL = xmp.MpModelWrapper(ResNet50())

def train_resnet50():
  torch.manual_seed(1)

  def get_dataset():
    norm = transforms.Normalize(
        mean=(0.4914, 0.4822, 0.4465), std=(0.2023, 0.1994, 0.2010))
    transform_train = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        norm,
    ])
    transform_test = transforms.Compose([
        transforms.ToTensor(),
        norm,
    ])
    train_dataset = datasets.CIFAR10(
        root=FLAGS['data_dir'],
        train=True,
        download=True,
        transform=transform_train)
    test_dataset = datasets.CIFAR10(
        root=FLAGS['data_dir'],
        train=False,
        download=True,
        transform=transform_test)
   
    return train_dataset, test_dataset
 
  # 使用串行执行器可以避免多个进程
  # 下载相同的数据。
  train_dataset, test_dataset = SERIAL_EXEC.run(get_dataset)

  train_sampler = torch.utils.data.distributed.DistributedSampler(
      train_dataset,
      num_replicas=xm.xrt_world_size(),
      rank=xm.get_ordinal(),
      shuffle=True)
  train_loader = torch.utils.data.DataLoader(
      train_dataset,
      batch_size=FLAGS['batch_size'],
      sampler=train_sampler,
      num_workers=FLAGS['num_workers'],
      drop_last=True)
  test_loader = torch.utils.data.DataLoader(
      test_dataset,
      batch_size=FLAGS['batch_size'],
      shuffle=False,
      num_workers=FLAGS['num_workers'],
      drop_last=True)

  # 将学习率缩放
  learning_rate = FLAGS['learning_rate'] * xm.xrt_world_size()

  # 获取损失函数、优化器和模型
  device = xm.xla_device()
  model = WRAPPED_MODEL.to(device)
  optimizer = optim.SGD(model.parameters(), lr=learning_rate,
                        momentum=FLAGS['momentum'], weight_decay=5e-4)
  loss_fn = nn.NLLLoss()

  def train_loop_fn(loader):
    tracker = xm.RateTracker()
    model.train()
    for x, (data, target) in enumerate(loader):
      optimizer.zero_grad()
      output = model(data)
      loss = loss_fn(output, target)
      loss.backward()
      xm.optimizer_step(optimizer)
      tracker.add(FLAGS['batch_size'])
      if x % FLAGS['log_steps'] == 0:
        print('[xla:{}]({}) Loss={:.2f} Time={}'.format(xm.get_ordinal(), x, loss.item(), time.asctime()), flush=True)

  def test_loop_fn(loader):
    total_samples = 0
    correct = 0
    model.eval()
    data, pred, target = None, None, None
    for data, target in loader:
      output = model(data)
      pred = output.max(1, keepdim=True)[1]
      correct += pred.eq(target.view_as(pred)).sum().item()
      total_samples += data.size()[0]

    accuracy = 100.0 * correct / total_samples
    print('[xla:{}] Accuracy={:.2f}%'.format(
        xm.get_ordinal(), accuracy), flush=True)
    return accuracy, data, pred, target

  # 训练和评估的循环
  accuracy = 0.0
  data, pred, target = None, None, None
  for epoch in range(1, FLAGS['num_epochs'] + 1):
    para_loader = pl.ParallelLoader(train_loader, [device])
    train_loop_fn(para_loader.per_device_loader(device))
    xm.master_print("Finished training epoch {}".format(epoch))

    para_loader = pl.ParallelLoader(test_loader, [device])
    accuracy, data, pred, target  = test_loop_fn(para_loader.per_device_loader(device))
    if FLAGS['metrics_debug']:
      xm.master_print(met.metrics_report(), flush=True)

  return accuracy, data, pred, target

现在,我们将开始ResNet50的训练。训练将在我们在参数中定义的50个epoch内完成。训练开始前,我们会记录训练时间,训练结束后,我们将打印总时间。

start_time = time.time()
# 启动训练流程
def training(rank, flags):
  global FLAGS
  FLAGS = flags
  torch.set_default_tensor_type('torch.FloatTensor')
  accuracy, data, pred, target = train_resnet50()
  if rank == 0:
    # 检索TPU核心0上的张量并绘制。
    plot_results(data.cpu(), pred.cpu(), target.cpu())

xmp.spawn(training, args=(FLAGS,), nprocs=FLAGS['num_cores'],
          start_method='fork')

使用TPU在PyTorch中实现ResNet50第4张

使用TPU在PyTorch中实现ResNet50第5张
使用TPU在PyTorch中实现ResNet50第6张

训练结束后,我们会打印训练过程所花费的时间。

使用TPU在PyTorch中实现ResNet50第7张

最后,在训练过程中,我们将模型对样本测试数据的预测可视化。

end_time = time.time()
print("Time taken = ", end_time-start_time)

使用TPU在PyTorch中实现ResNet50第8张

原文链接:https://analyticsindiamag.com/hands-on-guide-to-implement-resnet50-in-pytorch-with-tpu/

欢迎关注磐创AI博客站:
http://panchuang.net/

sklearn机器学习中文官方文档:
http://sklearn123.com/

欢迎关注磐创博客资源汇总站:
http://docs.panchuang.net/

免责声明:文章转载自《使用TPU在PyTorch中实现ResNet50》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇ts变量类型和编译文件tsconfig.json简单配置纯javascript对撤销和重写(undo、redo)的完美实现,适用于任何页面元素操作下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

js-定时器

知识 要用定时器,先清除定时器   1、 定时器   setInterval() 循环定时器;周而复始的执行(循环执行)  setTimeout()( 执行事件,间隔时间(单位毫秒)) <script> var num=0; setInterval(function(){ console.log( num);...

scan chain的原理和实现——5.UDTP

UDTP(user defined test point) 指示DFTC在设计中用户指定的位置插入控制点和观察点 1.为什么要使用UDTP? 修复无法控制的clock和/或asynch pins;  增加设计的测试覆盖率;  减少pattern数量 2.UDTP的类型 ①Force force_0、force_1、force_01、force_z0、for...

Vagrant系列(二)Vagrant的配置文件Vagrantfile详解

一、简介 在我们的工作目录下有一个Vagrantfile文件,里面包含有大量的配置信息,通过它可以定义虚拟机的各种配置,如网络、内存、主机名等,主要包括三个方面的配置,虚拟机的配置、SSH配置、Vagrant的一些基础配置。Vagrant是使用Ruby开发的,所以它的配置语法也是Ruby的,每个项目都需要有一个Vagrantfile,在执行vagrant...

利用LSTM(长短期记忆网络)来处理脑电数据

目录 LSTM 原理介绍 LSTM的核心思想 一步一步理解LSTM 代码案例 本分享为脑机学习者Rose整理发表于公众号:脑机接口社区(微信号:Brain_Computer).QQ交流群:903290195 Rose小哥今天介绍一下用LSTM来处理脑电数据。 LSTM 原理介绍 LSTMs(Long Short Term Memory net...

Jenkins执行python脚本

构建选择Excute Windows batch command 下面是python脚本,注意字符集GBK runtest.py #-*-coding:GBK -*- importsys importtime importpymysql importrequests #print(sys.argv[1]) ids_out =[] #用","分割,得...

简道云--公式与函数的使用教程

公式与函数 在制作表单时,可以设置控件与控件之间的数据联动关系。给例如编辑完单价和数量后,自动计算总价等这样的业务场景提供了支撑。 公式面板左侧可以选择当前表单控件所对应的值,以及所有表单控件所对应的字段名。被选择后,在公式面板中会以反引号包裹的形式显示。 注意:函数在简道云里的设置是大写,即在运用函数的时候,请用纯大写字母。 表单控件与其返回值的数据类型...