Source code for llvm_ir_dataset_utils.tools.upload_dataset_hf
"""A script for uploading a dataset in the form of a folder of parquet files
to huggingface.
"""
import logging
import os
from absl import app
from absl import flags
import ray
from huggingface_hub import HfApi
from huggingface_hub import CommitOperationAdd
from huggingface_hub import preupload_lfs_files
from huggingface_hub import create_commit
FLAGS = flags.FLAGS
flags.DEFINE_string('dataset_dir', None,
'The path to the folder containing the parquet files.')
flags.DEFINE_string('commit_message', None,
'Git commit message for the upload.')
flags.DEFINE_string('start_after', None, 'A specific path to start at.')
flags.DEFINE_integer('operations_per_commit', 50,
'The number of operations to cache before committing')
flags.mark_flag_as_required('dataset_dir')
flags.mark_flag_as_required('commit_message')
@ray.remote(num_cpus=4)
def upload_file(api, full_file_path, file_to_upload):
try:
hf_file_path = 'data/' + file_to_upload
operation = CommitOperationAdd(
path_in_repo=hf_file_path, path_or_fileobj=full_file_path)
preupload_lfs_files(
'llvm-ml/ComPile', additions=[operation], repo_type='dataset')
logging.warning(f'Finished uploading {file_to_upload}')
return (True, operation)
except Exception as e:
logging.error(f'Ran into an error, retrying {file_to_upload}: {e}')
return (False, full_file_path, file_to_upload)
[docs]def main(_):
logging.info('Starting the upload')
api = HfApi()
file_upload_futures = []
for language_folder in os.listdir(FLAGS.dataset_dir):
for file_name in os.listdir(
os.path.join(FLAGS.dataset_dir, language_folder)):
if FLAGS.start_after and file_name <= FLAGS.start_after:
logging.info(f'Skipping uploading {file_name}')
continue
full_file_path = os.path.join(FLAGS.dataset_dir, language_folder,
file_name)
file_to_upload = os.path.join(language_folder, file_name)
file_upload_futures.append(
upload_file.remote(api, full_file_path, file_to_upload))
current_operations = []
while len(file_upload_futures) > 0:
completed_uploads, file_upload_futures = ray.wait(
file_upload_futures, timeout=5)
logging.info(
f'Just finished {len(completed_uploads)}, {len(file_upload_futures)} remaining.'
)
returned_uploads = ray.get(completed_uploads)
for returned_upload in returned_uploads:
if returned_upload[0]:
current_operations.append(returned_upload[1])
else:
file_upload_futures.append(
upload_file.remote(api, returned_upload[0], returned_upload[1]))
if len(current_operations) > FLAGS.operations_per_commit:
create_commit(
'llvm-ml/ComPile',
operations=current_operations,
commit_message='Add additional data',
repo_type='dataset')
current_operations = []
create_commit(
'llvm-ml/ComPile',
operations=current_operations,
commit_message=FLAGS.commit_message,
repo_type='dataset')
current_operations = []
if __name__ == '__main__':
app.run(main)