Shortcuts

Source code for mmeval.core.dist_backends.oneflow_dist

# Copyright (c) OpenMMLab. All rights reserved.

import numpy as np
import pickle
from typing import TYPE_CHECKING, Any, List, Tuple, TypeVar, Union

from mmeval.utils import try_import
from .base_backend import TensorBaseDistBackend

if TYPE_CHECKING:
    import oneflow
    import oneflow as flow
    import oneflow.framework.check_point_v2 as check_point_v2
else:
    flow = try_import('oneflow')
    check_point_v2 = try_import('oneflow.framework.check_point_v2')

Tensor = TypeVar('Tensor', bound='oneflow.Tensor')


[docs]class OneFlowDist(TensorBaseDistBackend): """A distributed communication backend for oneflow.""" def __init__(self) -> None: super().__init__() if flow is None: raise ImportError(f'For availability of {self.__class__.__name__},' ' please install oneflow first.') @property def is_initialized(self) -> bool: """Returns True if the distributed environment has been initialized. Returns: bool: Returns True if the distributed environment has been initialized, otherwise returns False. """ try: flow.env.get_world_size() is_init = True except ValueError: is_init = False return is_init @property def rank(self) -> int: """Returns the rank index of the current process group.""" return flow.env.get_rank() @property def world_size(self) -> int: """Returns the world size of the current process group.""" return flow.env.get_world_size() def _object_to_tensor(self, obj: Any) -> Tuple[Tensor, Tensor]: """Convert the given object to a tensor via `pickle.dumps`. Args: obj (any): Any pickle-able python object. Returns: Tuple: A tuple of the tensor converted from given object and the tensor size. """ buffer = pickle.dumps(obj) storage = np.frombuffer(buffer, dtype=np.int8) obj_tensor = flow.tensor(storage) obj_size_tensor = flow.tensor([obj_tensor.numel()]) return obj_tensor, obj_size_tensor def _tensor_to_object(self, tensor: Tensor, tensor_size: Union[int, Tensor]) -> Any: """Convert the given Tensor to a object via `pickle.loads`. Args: tenosr (Tensor): A tensor-like data. tensor_size (int or Tensor): The tensor size of the given Tensor to be convert object. Returns: Any: The object converted from the given tensor. """ size = int(tensor_size) buffer = tensor.cpu().numpy().tobytes()[:size] obj = pickle.loads(buffer) return obj def _pad_tensor(self, tensor: Tensor, max_size: Union[int, Tensor]) -> Tensor: # yapf: disable """Padding the given tensor to the given size. Args: tensor (Tensor): A tensor-like data to be padded. max_size (int or Tensor): The max tensor size that for tensor padding. Returns: Tensor: The padded tensor. """ max_size = int(max_size) padding = flow.zeros((max_size - tensor.numel(), ), dtype=flow.int8, device=tensor.device) tensor = flow.cat((tensor, padding), dim=0) return tensor def _all_gather(self, tensor: Tensor) -> List[Tensor]: """All gather the given tensor. Args: tensor (Tensor): The tensor for all gather. Returns: list: A list of the gathered tensor. """ tensor_list = [ flow.empty_like(tensor).to(tensor.device) for _ in range(self.world_size) ] flow.comm.all_gather(tensor_list, tensor) return tensor_list def _broadcast(self, tensor: Tensor, src: int = 0) -> Tensor: """Broadcast the given object from the source rank. Args: tensor (Tensor): The tensor for broadcast. src (int): The source rank index. Returns: Tensor: The broadcast tensor. """ flow.comm.broadcast(tensor, src=src) return tensor
Read the Docs v: latest
Versions
latest
stable
Downloads
pdf
html
epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.