#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from py4j.protocol import Py4JError
from bigdl.orca.data.utils import *
from bigdl.orca import OrcaContext
from bigdl.dllib.nncontext import init_nncontext, ZooContext
from bigdl.dllib.utils.common import *
from bigdl.dllib.utils import nest
from bigdl.dllib.utils.log4Error import *
import numpy as np
[docs]class XShards(object):
"""
A collection of data which can be pre-processed in parallel.
"""
[docs] def collect(self):
"""
Returns a list that contains all of the elements in this XShards
:return: list of elements
"""
pass
[docs] def num_partitions(self):
"""
return the number of partitions in this XShards
:return: an int
"""
pass
[docs] @classmethod
def load_pickle(cls, path, minPartitions=None):
"""
Load XShards from pickle files.
:param path: The pickle file path/directory
:param minPartitions: The minimum partitions for the XShards
:return: SparkXShards object
"""
sc = init_nncontext()
return SparkXShards(sc.pickleFile(path, minPartitions))
[docs] @staticmethod
def partition(data, num_shards=None):
"""
Partition local in memory data and form a SparkXShards
:param data: np.ndarray, a tuple, list, dict of np.ndarray, or a nested structure
made of tuple, list, dict with ndarray as the leaf value
:param num_shards: the number of shards that the data will be partitioned into
:return: a SparkXShards
"""
sc = init_nncontext()
node_num, core_num = get_node_and_core_number()
shard_num = node_num * core_num if num_shards is None else num_shards
import numpy as np
type_err_msg = """
The types supported in bigdl.orca.data.XShards.partition are
1. np.ndarray
2. a tuple, list, dict of np.ndarray
3. nested structure made of tuple, list, dict with ndarray as the leaf value
But got data of type {}
""".format(type(data))
supported_types = {list, tuple, dict}
if isinstance(data, np.ndarray):
if data.shape[0] < shard_num:
invalidInputError(False,
"The length of data {} is smaller than the total number "
"of shards {}. Please adjust the num_shards option to be "
"at most {}.".format(data.shape[0], shard_num, data.shape[0]))
arrays = np.array_split(data, shard_num)
rdd = sc.parallelize(arrays)
else:
invalidInputError(type(data) in supported_types, type_err_msg)
flattened = nest.flatten(data)
data_length = len(flattened[0])
data_to_be_shard = []
if data_length < shard_num:
invalidInputError(False,
"The length of data {} is smaller than the total number "
"of shards {}. Please adjust the num_shards option to be "
"at most {}.".format(data_length, shard_num, data_length))
for i in range(shard_num):
data_to_be_shard.append([])
for x in flattened:
invalidInputError(len(x) == data_length,
"the ndarrays in data must all have the same size in first"
" dimension, got first ndarray of size {} and"
" another {}".format(data_length, len(x)))
x_parts = np.array_split(x, shard_num)
for idx, x_part in enumerate(x_parts):
data_to_be_shard[idx].append(x_part)
data_to_be_shard = [nest.pack_sequence_as(data, shard) for shard in data_to_be_shard]
rdd = sc.parallelize(data_to_be_shard)
data_shards = SparkXShards(rdd)
return data_shards
class SparkXShards(XShards):
"""
A collection of data which can be pre-processed in parallel on Spark
"""
def __init__(self, rdd, transient=False, class_name=None):
self.rdd = rdd
self.user_cached = False
if transient:
self.eager = False
else:
self.eager = OrcaContext._eager_mode
self.rdd.cache()
if self.eager:
self.compute()
self.type = {}
if class_name:
self.type['class_name'] = class_name
def transform_shard(self, func, *args):
"""
Return a new SparkXShards by applying a function to each shard of this SparkXShards
:param func: python function to process data. The first argument is the data shard.
:param args: other arguments in this function.
:return: a new SparkXShards.
"""
def transform(iter, func, *args):
for x in iter:
yield func(x, *args)
transformed_shard = SparkXShards(self.rdd.mapPartitions(lambda iter:
transform(iter, func, *args)))
self._uncache()
return transformed_shard
def collect(self):
"""
Returns a list that contains all of the elements in this SparkXShards
:return: a list of data elements.
"""
return self.rdd.collect()
def cache(self):
"""
Persist this SparkXShards in memory
:return:
"""
self.user_cached = True
self.rdd.cache()
return self
def uncache(self):
"""
Make this SparkXShards as non-persistent, and remove all blocks for it from memory
:return:
"""
self.user_cached = False
if self.is_cached():
try:
self.rdd.unpersist()
except Py4JError:
print("Try to unpersist an uncached rdd")
return self
def _uncache(self):
if not self.user_cached:
self.uncache()
def is_cached(self):
return self.rdd.is_cached
def compute(self):
self.rdd.count()
return self
def num_partitions(self):
"""
Get number of partitions for this SparkXShards.
:return: number of partitions.
"""
return self.rdd.getNumPartitions()
def repartition(self, num_partitions):
"""
Return a new SparkXShards that has exactly num_partitions partitions.
:param num_partitions: target number of partitions
:return: a new SparkXShards object.
"""
if self._get_class_name() == 'pandas.core.frame.DataFrame':
import pandas as pd
if num_partitions > self.rdd.getNumPartitions():
rdd = self.rdd\
.flatMap(lambda df: df.apply(lambda row: (row[0], row.values.tolist()), axis=1)
.values.tolist())\
.partitionBy(num_partitions)
schema = self.get_schema()
def merge_rows(iter):
data = [value[1] for value in list(iter)]
if data:
df = pd.DataFrame(data=data, columns=schema['columns'])\
.astype(schema['dtypes'])
return [df]
else:
# no data in this partition
return iter
repartitioned_shard = SparkXShards(rdd.mapPartitions(merge_rows))
else:
def combine_df(iter):
dfs = list(iter)
if len(dfs) > 0:
return [pd.concat(dfs)]
else:
return iter
rdd = self.rdd.coalesce(num_partitions)
repartitioned_shard = SparkXShards(rdd.mapPartitions(combine_df))
elif self._get_class_name() == 'builtins.list':
if num_partitions > self.rdd.getNumPartitions():
rdd = self.rdd \
.flatMap(lambda data: data) \
.repartition(num_partitions)
repartitioned_shard = SparkXShards(rdd.mapPartitions(
lambda iter: [list(iter)]))
else:
rdd = self.rdd.coalesce(num_partitions)
from functools import reduce
repartitioned_shard = SparkXShards(rdd.mapPartitions(
lambda iter: [reduce(lambda l1, l2: l1 + l2, iter)]))
elif self._get_class_name() == 'numpy.ndarray':
elem = self.rdd.first()
shape = elem.shape
dtype = elem.dtype
if len(shape) > 0:
if num_partitions > self.rdd.getNumPartitions():
rdd = self.rdd\
.flatMap(lambda data: list(data))\
.repartition(num_partitions)
repartitioned_shard = SparkXShards(rdd.mapPartitions(
lambda iter: np.stack([list(iter)], axis=0)
.astype(dtype)))
else:
rdd = self.rdd.coalesce(num_partitions)
from functools import reduce
repartitioned_shard = SparkXShards(rdd.mapPartitions(
lambda iter: [np.concatenate(list(iter), axis=0)]))
else:
repartitioned_shard = SparkXShards(self.rdd.repartition(num_partitions))
elif self._get_class_name() == "builtins.dict":
elem = self.rdd.first()
keys = list(elem.keys())
dtypes = []
dict_of_batched_ndarray = True
# Check if all values are ndarray and shape > 1
for v in elem.values():
if v.__class__.__name__ != "ndarray" or len(v.shape) == 0:
dict_of_batched_ndarray = False
break
else:
dtypes.append(v.dtype)
if dict_of_batched_ndarray:
if num_partitions > self.rdd.getNumPartitions():
def dict_to_unbatched_list(d):
values = [list(d[k]) for k in keys]
return list(zip(*values))
def to_batched_dict(iter):
batch_values = list(zip(*iter))
if not batch_values:
return []
batch_ndarrays = [np.stack(v, axis=0).astype(dtype)
for v, dtype in zip(batch_values, dtypes)]
return [dict(zip(keys, batch_ndarrays))]
# If number of records in a partition <= 10, may produce empty partition
rdd = self.rdd.flatMap(lambda data: dict_to_unbatched_list(data))\
.repartition(num_partitions)
repartitioned_shard = SparkXShards(rdd.mapPartitions(
lambda iter: to_batched_dict(iter)))
else:
rdd = self.rdd.coalesce(num_partitions)
def merge_list_of_dict(iter):
iter_list = list(iter)
return [{k: np.concatenate([d[k] for d in iter_list], axis=0)
for k in keys}]
repartitioned_shard = SparkXShards(rdd.mapPartitions(
lambda iter: merge_list_of_dict(iter)))
else:
repartitioned_shard = SparkXShards(self.rdd.repartition(num_partitions))
else:
repartitioned_shard = SparkXShards(self.rdd.repartition(num_partitions))
self._uncache()
return repartitioned_shard
def partition_by(self, cols, num_partitions=None):
"""
Return a new SparkXShards partitioned using the specified columns.
This is only applicable for SparkXShards of Pandas DataFrame.
:param cols: specified columns to partition by.
:param num_partitions: target number of partitions. If not specified,
the new SparkXShards would keep the current partition number.
:return: a new SparkXShards.
"""
if self._get_class_name() == 'pandas.core.frame.DataFrame':
import pandas as pd
schema = self.get_schema()
# if partition by a column
if isinstance(cols, str):
if cols not in schema['columns']:
invalidInputError(False,
"The partition column is not in the DataFrame")
# change data to key value pairs
rdd = self.rdd.flatMap(
lambda df: df.apply(lambda row: (row[cols], row.values.tolist()), axis=1)
.values.tolist())
partition_num = self.rdd.getNumPartitions() if not num_partitions \
else num_partitions
# partition with key
partitioned_rdd = rdd.partitionBy(partition_num)
else:
invalidInputError(False,
"Only support partition by a column name")
def merge(iterator):
data = [value[1] for value in list(iterator)]
if data:
df = pd.DataFrame(data=data, columns=schema['columns']).astype(schema['dtypes'])
return [df]
else:
# no data in this partition
return []
# merge records to df in each partition
partitioned_shard = SparkXShards(partitioned_rdd.mapPartitions(merge))
self._uncache()
return partitioned_shard
else:
invalidInputError(False,
"Currently only support partition by for XShards"
" of Pandas DataFrame")
def unique(self):
"""
Return a unique list of elements of this SparkXShards.
This is only applicable for SparkXShards of Pandas Series.
:return: a unique list of elements of this SparkXShards.
"""
if self._get_class_name() == 'pandas.core.series.Series':
import pandas as pd
rdd = self.rdd.map(lambda s: s.unique())
import numpy as np
result = rdd.reduce(lambda list1, list2: pd.unique(np.concatenate((list1, list2),
axis=0)))
return result
else:
# we may support numpy or other types later
invalidInputError(False,
"Currently only support unique() on XShards of Pandas Series")
def deduplicates(self):
if self._get_class_name() == 'pandas.core.frame.DataFrame':
import pandas as pd
df = self.to_spark_df()
distinctDF = df.distinct()
data_shards = spark_df_to_pd_sparkxshards(distinctDF)
return data_shards
else:
# we may support numpy or other types later
invalidInputError(False,
"Currently only support dedup() on XShards of Pandas DataFrame")
def assembleFeatureLabelCols(self, featureCols, labelCols):
"""
The api is used to merge/convert one or multiple feature columns into a numpy array,
merge/convert one or multiple label columns into a numpy array.
:param featureCols: a list of feature columns.
:param labelCols: a list of label columns.
:return: SparkXShards of dictionary, key is assembled feature numpy array, value is
assembled label numpy array
eg:
shards: SparkXShards of pandas data frame with 9 cols ['f1', 'f2', 'f3', 'f4', 'f5', 'f6',
'f7', 'f8', 'lable']
f1 f2 f3 f4 f5 f6 f7 f8 label
6 148 72 35 0 33.6 0.627 50 1
1 85 66 29 0 26.6 0.351 31 0
8 183 64 0 0 23.3 0.672 32 1
1 89 66 23 94 28.1 0.167 21 0
0 137 40 35 168 43.1 2.288 33 1
transform_shards =
shards.assembleFeatureLabelCols(featureCols=['f1', 'f2', 'f3', 'f4', 'f5', 'f6',
'f7', 'f8'], labelCols=['label'])
transform_shards will be SparkXShards of dictionary. key will be a stacked numpy array
(stack feature columns), value will be a numpy array
{'x': array([[ 6. , 148. , 72. , ..., 33.6 , 0.627, 50. ],
[ 1. , 85. , 66. , ..., 26.6 , 0.351, 31. ],
[ 8. , 183. , 64. , ..., 23.3 , 0.672, 32. ],
[ 1. , 89. , 66. , ..., 28.1 , 0.167, 21. ],
[ 0. , 137. , 40. , ..., 43.1 , 2.288, 33. ]]),
'y': array([[1],
[0],
[1],
[0],
[1]
"""
if self._get_class_name() != 'pandas.core.frame.DataFrame':
invalidInputError(False,
"Currently only support assembleFeatureLabelCols() on"
" XShards of Pandas DataFrame")
def to_shard_dict(df):
featureLists = [df[feature_col].to_numpy() for feature_col in featureCols]
labelLists = [df[label_col].to_numpy() for label_col in labelCols]
result = {
"x": np.stack(featureLists, axis=1),
"y": np.stack(labelLists, axis=1)}
return result
invalidInputError(type(featureCols) == list, "expect featureCols is a list")
invalidInputError(type(labelCols) == list, "expect labelCols is a list")
transformed_shard = self.transform_shard(to_shard_dict)
return transformed_shard
def split(self):
"""
Split SparkXShards into multiple SparkXShards.
Each element in the SparkXShards needs be a list or tuple with same length.
:return: Splits of SparkXShards. If element in the input SparkDataShard is not
list or tuple, return list of input SparkDataShards.
"""
# get number of splits
list_split_length = self.rdd.map(lambda data: len(data) if isinstance(data, list) or
isinstance(data, tuple) else 1).collect()
# check if each element has same splits
if list_split_length.count(list_split_length[0]) != len(list_split_length):
invalidInputError(False,
"Cannot split this XShards because its partitions "
"have different split length")
else:
if list_split_length[0] > 1:
def get_data(order):
def transform(data):
return data[order]
return transform
split_shard_list = [SparkXShards(self.rdd.map(get_data(i)))
for i in range(list_split_length[0])]
self._uncache()
return split_shard_list
else:
return [self]
def zip(self, other):
"""
Zips this SparkXShards with another one, returning key-value pairs with the first element
in each SparkXShards, second element in each SparkXShards, etc. Assumes that the two
SparkXShards have the *same number of partitions* and the *same number of elements
in each partition*(e.g. one was made through a transform_shard on the other
:param other: another SparkXShards
:return: zipped SparkXShards
"""
invalidInputError(isinstance(other, SparkXShards), "other should be a SparkXShards")
invalidInputError(self.num_partitions() == other.num_partitions(),
"The two SparkXShards should have the same number of partitions")
try:
rdd = self.rdd.zip(other.rdd)
zipped_shard = SparkXShards(rdd)
other._uncache()
self._uncache()
return zipped_shard
except Exception:
invalidInputError(False,
"The two SparkXShards should have the same number of elements "
"in each partition")
def _to_spark_df_without_arrow(self):
def f(iter):
from bigdl.dllib.utils.log4Error import invalidInputError
pdf_list = list(iter)
invalidInputError(len(pdf_list) == 1,
f"For XShards of pandas dataframe, expects there is only 1"
f" pandas dataframe for each partition, but got {len(pdf_list)}")
for pdf in pdf_list:
np_records = pdf.to_records(index=False)
return [r.tolist() for r in np_records]
rdd = self.rdd.mapPartitions(f)
column = self.get_schema()['columns']
df = rdd.toDF(list(column))
return df
# to_spark_df adapted from pyspark
# https://github.com/apache/spark/blob/master/python/pyspark/sql/pandas/conversion.py
def to_spark_df(self):
if self._get_class_name() != 'pandas.core.frame.DataFrame':
invalidInputError(False,
"Currently only support to_spark_df on XShards of Pandas DataFrame")
try:
import pyarrow as pa
sdf_schema = self._get_spark_df_schema()
sqlContext = get_spark_sql_context(get_spark_context())
timezone = sqlContext._conf.sessionLocalTimeZone()
def f(iter):
from bigdl.dllib.utils.log4Error import invalidInputError
pdf_list = list(iter)
invalidInputError(len(pdf_list) == 1,
f"For XShards of pandas dataframe, expects there is only 1"
f" pandas dataframe for each partition, but got {len(pdf_list)}")
for pdf in pdf_list:
import os
import uuid
from pyspark.sql.pandas.types import to_arrow_type
from pyspark.sql.pandas.serializers import ArrowStreamPandasSerializer
from tempfile import NamedTemporaryFile
tmpFile = "/tmp/" + str(uuid.uuid1())
os.mkdir(tmpFile)
arrow_types = [to_arrow_type(f.dataType) for f in sdf_schema.fields]
arrow_data = [[(c, t) for (_, c), t in zip(pdf.iteritems(), arrow_types)]]
col_by_name = True
safecheck = False
ser = ArrowStreamPandasSerializer(timezone, safecheck, col_by_name)
tempFile = NamedTemporaryFile(delete=False, dir=tmpFile)
try:
ser.dump_stream(arrow_data, tempFile)
finally:
tempFile.close()
return [tempFile.name]
jiter = self.rdd.mapPartitions(f)
from bigdl.dllib.utils.file_utils import callZooFunc
df = callZooFunc("float", "orcaToDataFrame", jiter, sdf_schema.json(), sqlContext)
return df
except Exception as e:
print(f"createDataFrame from shards attempted Arrow optimization failed as: {str(e)},"
f"Will try without Arrow optimization")
return self._to_spark_df_without_arrow()
def __len__(self):
return self.rdd.map(lambda data: len(data) if hasattr(data, '__len__') else 1)\
.reduce(lambda l1, l2: l1 + l2)
def save_pickle(self, path, batchSize=10):
"""
Save this SparkXShards as a SequenceFile of serialized objects.
The serializer used is pyspark.serializers.PickleSerializer, default batch size is 10.
:param path: target path.
:param batchSize: batch size for each sequence file chunk.
"""
self.rdd.saveAsPickleFile(path, batchSize)
return self
def __del__(self):
self.uncache()
def __getitem__(self, key):
def get_data(data):
invalidInputError(hasattr(data, '__getitem__'),
"No selection operation available for this XShards")
try:
value = data[key]
except:
invalidInputError(False,
"Invalid key for this XShards")
return value
return SparkXShards(self.rdd.map(get_data), transient=True)
def _for_each(self, func, *args, **kwargs):
def utility_func(x, func, *args, **kwargs):
try:
result = func(x, *args, **kwargs)
except Exception as e:
return e
return result
result_rdd = self.rdd.map(lambda x: utility_func(x, func, *args, **kwargs))
return result_rdd
def get_schema(self):
if 'schema' in self.type:
return self.type['schema']
if 'class_name' not in self.type\
or self.type['class_name'] == 'pandas.core.frame.DataFrame':
class_name, pdf_schema, sdf_schema = self._get_schema_class_name()
self.type['class_name'] = class_name
self.type['schema'] = pdf_schema
self.type['spark_df_schema'] = sdf_schema
return self.type['schema']
return None
def _get_spark_df_schema(self):
if 'spark_df_schema' in self.type:
return self.type['spark_df_schema']
if 'class_name' not in self.type\
or self.type['class_name'] == 'pandas.core.frame.DataFrame':
class_name, pdf_schema, sdf_schema = self._get_schema_class_name()
self.type['class_name'] = class_name
self.type['schema'] = pdf_schema
self.type['spark_df_schema'] = sdf_schema
return self.type['spark_df_schema']
return None
def _get_class_name(self):
if 'class_name' in self.type:
return self.type['class_name']
else:
class_name, schema, sdf_schema = self._get_schema_class_name()
self.type['class_name'] = class_name
self.type['schema'] = schema
self.type['spark_df_schema'] = sdf_schema
return self.type['class_name']
def _get_schema_class_name(self):
class_name = self.type['class_name'] if 'class_name' in self.type else None
import pyspark
spark_version = pyspark.version.__version__
major_version = spark_version.split(".")[0]
def func(pdf):
pdf_schema = None
spark_df_schema = None
_class_name = class_name
if not _class_name:
_class_name = pdf.__class__.__module__ + '.' + pdf.__class__.__name__
if _class_name == 'pandas.core.frame.DataFrame':
schema = [str(x) if not isinstance(x, str) else x for x in pdf.columns]
pdf_schema = {'columns': schema, 'dtypes': pdf.dtypes}
if major_version >= '3':
from pyspark.sql.pandas.types import from_arrow_type
from pyspark.sql.types import StructType
if isinstance(schema, (list, tuple)):
import pyarrow as pa
arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False)
struct = StructType()
for name, field in zip(schema, arrow_schema):
struct.add(
name, from_arrow_type(field.type), nullable=field.nullable
)
spark_df_schema = struct
return (_class_name, pdf_schema, spark_df_schema)
return self.rdd.map(lambda x: func(x)).first()
class SharedValue(object):
def __init__(self, data):
sc = init_nncontext()
self.broadcast_data = sc.broadcast(data)
self._value = None
@property
def value(self):
self._value = self.broadcast_data.value
return self._value
def unpersist(self):
self.broadcast_data.unpersist()
def spark_df_to_ray_dataset(df):
"""
Convert a Spark DataFrame to Ray Dataset. The block number of ray datasets equals to the
partition number of the input DataFrame.
:param df: A Spark dataframe.
:return: A Ray Dataset holding Arrow records read from the dataframe.
"""
spark_xshards = spark_df_to_pd_sparkxshards(df)
ray_dataset = spark_xshards_to_ray_dataset(spark_xshards)
return ray_dataset