Source code for llvm_ir_dataset_utils.tools.process_to_parquet
"""This is a script that allows for the conversion of a deduplicated dataset
into a parquet dataset for distribution.
"""
import logging
import os
import sys
import glob
from absl import app
from absl import flags
import pandas
import pyarrow
import ray
from pyarrow import parquet
from llvm_ir_dataset_utils.util import dataset_corpus
FLAGS = flags.FLAGS
flags.DEFINE_multi_string('corpus_dir', None,
'The corpus to pull bitcode from.')
flags.DEFINE_integer('max_batches', sys.maxsize,
'The maximum number of projects to process')
flags.DEFINE_string('output_path', None,
'The output path to place the parquet files in.')
flags.DEFINE_integer('chunk_size', 500, 'The number of MB per parquet file.')
flags.mark_flag_as_required('corpus_dir')
flags.mark_flag_as_required('output_path')
@ray.remote(num_cpus=1)
def process_single_batch(batch_dirs, dataset_path, corpus_name):
bitcode_paths = []
license_information = {}
for batch_dir in batch_dirs:
try:
new_bitcode_paths = [
(batch_dir, bitcode_path)
for bitcode_path in dataset_corpus.get_bitcode_file_paths(batch_dir)
]
bitcode_paths.extend(new_bitcode_paths)
license_information.update(
dataset_corpus.load_json_from_corpus(batch_dir,
'./license_info.json'))
except Exception:
logging.warning('Failed to get bitcode_paths')
continue
module_content = []
license_expression = []
license_source = []
license_file = []
package_source = []
for bitcode_path_info in bitcode_paths:
batch_dir, bitcode_path = bitcode_path_info
bitcode_file_data = dataset_corpus.load_file_from_corpus(
batch_dir, bitcode_path)
module_content.append(bitcode_file_data)
# Cut off the first two characters and the last two characters as we only
# want the raw module hash.
bitcode_license_info = license_information[bitcode_path[2:-3]]
license_expression.append(bitcode_license_info[0])
license_source.append(bitcode_license_info[1])
license_file.append(bitcode_license_info[2])
package_source.append(bitcode_license_info[3])
assert (len(corpus_name) > 0)
dataframe = pandas.DataFrame.from_dict({
'content': module_content,
'license_expression': license_expression,
'license_source': license_source,
'license_files': license_file,
'package_source': package_source,
'language': corpus_name
})
table = pyarrow.Table.from_pandas(dataframe, preserve_index=False)
parquet.write_table(table, dataset_path, compression='NONE')
[docs]def main(_):
corpus_projects_list = {}
for corpus_dir in FLAGS.corpus_dir:
new_projects_list = os.listdir(corpus_dir)
corpus_name = os.path.basename(os.path.abspath(corpus_dir))
corpus_projects_list[corpus_name] = []
for project_path in new_projects_list:
corpus_projects_list[corpus_name].append((corpus_dir, project_path))
# Create directories for each of the output corpora
os.mkdir(os.path.join(FLAGS.output_path, corpus_name))
total_project_count = 0
for corpus_name in corpus_projects_list:
total_project_count += len(corpus_projects_list[corpus_name])
logging.info(f'Processing {total_project_count} projects')
current_parquet_size = 0
current_parquet_paths = []
current_parquet_index = 0
parquet_batches = []
for corpus_name in corpus_projects_list:
for index, project_info in enumerate(corpus_projects_list[corpus_name]):
corpus_dir, project_dir = project_info
batch_path = os.path.join(corpus_dir, project_dir)
batch_size = os.stat(batch_path).st_size / (2**20)
current_parquet_paths.append(batch_path)
current_parquet_size += batch_size
if current_parquet_size > FLAGS.chunk_size:
parquet_batches.append(
(current_parquet_index, current_parquet_paths, corpus_name))
current_parquet_index += 1
current_parquet_paths = []
current_parquet_size = 0
if index >= FLAGS.max_batches:
break
# If we've finished a corpus and haven't already put everything into a
# parquet file, we need to flush everything at this point.
if len(current_parquet_paths) > 0:
parquet_batches.append(
(current_parquet_index, current_parquet_paths, corpus_name))
current_parquet_index += 1
current_parquet_paths = []
current_parquet_size = 0
parquet_batch_futures = []
for parquet_batch in parquet_batches:
parquet_index, parquet_paths, corpus_name = parquet_batch
parquet_batch_futures.append(
process_single_batch.remote(
parquet_paths,
os.path.join(FLAGS.output_path, corpus_name,
f'train-{parquet_index}.parquet'), corpus_name))
while len(parquet_batch_futures) > 0:
finished, parquet_batch_futures = ray.wait(parquet_batch_futures, timeout=5)
logging.info(
f'Just finished {len(finished)}, {len(parquet_batch_futures)} remaining.'
)
if __name__ == '__main__':
app.run(main)