diff --git a/README.md b/README.md index 45b9b67ec..5f567ed4f 100644 --- a/README.md +++ b/README.md @@ -217,9 +217,9 @@ The stack of DataPipes can then be constructed in functional form: ```py >>> import torch.utils.data.datapipes as dp ->>> datapipes1 = dp.iter.FileLoader(['a.file', 'b.file']).map(fn=decoder).shuffle().batch(2) +>>> datapipes1 = dp.iter.FileOpener(['a.file', 'b.file']).map(fn=decoder).shuffle().batch(2) ->>> datapipes2 = dp.iter.FileLoader(['a.file', 'b.file']) +>>> datapipes2 = dp.iter.FileOpener(['a.file', 'b.file']) >>> datapipes2 = dp.iter.Mapper(datapipes2) >>> datapipes2 = dp.iter.Shuffler(datapipes2) >>> datapipes2 = dp.iter.Batcher(datapipes2, 2) @@ -259,7 +259,7 @@ Then, the pipeline can be assembled as follows: >>> FOLDER = 'path/2/csv/folder' >>> datapipe = dp.iter.FileLister([FOLDER]).filter(fn=lambda filename: filename.endswith('.csv')) ->>> datapipe = dp.iter.FileLoader(datapipe, mode='rt') +>>> datapipe = dp.iter.FileOpener(datapipe, mode='rt') >>> datapipe = datapipe.parse_csv_files(delimiter=' ') >>> for d in datapipe: # Start loading data diff --git a/examples/text/CC100.ipynb b/examples/text/CC100.ipynb index 1a047bc62..146a76719 100644 --- a/examples/text/CC100.ipynb +++ b/examples/text/CC100.ipynb @@ -1,215 +1,202 @@ { - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "source": [ - "import torch\n", - "import os\n", - "\n", - "from torchdata.datapipes.iter import (\n", - " FileLoader,\n", - " HttpReader,\n", - " IterableWrapper,\n", - " SampleMultiplexer,\n", - ")\n", - "\n", - "ROOT_DIR = os.path.expanduser('~/.torchdata/CC100') # This directory needs to be crated and set" - ], - "outputs": [], - "metadata": {} - }, - { - "cell_type": "code", - "execution_count": 2, - "source": [ - "# CC100 support (http://data.statmt.org/cc-100/)\n", - "\n", - "URL=\"http://data.statmt.org/cc-100/%s.txt.xz\"\n", - "VALID_CODES = [\n", - " \"am\", \"ar\", \"as\", \"az\", \"be\", \"bg\", \"bn\", \"bn_rom\", \"br\", \"bs\", \"ca\", \"cs\", \"cy\", \"da\", \"de\", \n", - " \"el\", \"en\", \"eo\", \"es\", \"et\", \"eu\", \"fa\", \"ff\", \"fi\", \"fr\", \"fy\", \"ga\", \"gd\", \"gl\", \"gn\", \"gu\", \n", - " \"ha\", \"he\", \"hi\", \"hi_rom\", \"hr\", \"ht\", \"hu\", \"hy\", \"id\", \"ig\", \"is\", \"it\", \"ja\", \"jv\", \"ka\", \n", - " \"kk\", \"km\", \"kn\", \"ko\", \"ku\", \"ky\", \"la\", \"lg\", \"li\", \"ln\", \"lo\", \"lt\", \"lv\", \"mg\", \"mk\", \"ml\", \n", - " \"mn\", \"mr\", \"ms\", \"my\", \"my_zaw\", \"ne\", \"nl\", \"no\", \"ns\", \"om\", \"or\", \"pa\", \"pl\", \"ps\", \"pt\", \n", - " \"qu\", \"rm\", \"ro\", \"ru\", \"sa\", \"si\", \"sc\", \"sd\", \"sk\", \"sl\", \"so\", \"sq\", \"sr\", \"ss\", \"su\", \"sv\", \n", - " \"sw\", \"ta\", \"ta_rom\", \"te\", \"te_rom\", \"th\", \"tl\", \"tn\", \"tr\", \"ug\", \"uk\", \"ur\", \"ur_rom\", \"uz\", \n", - " \"vi\", \"wo\", \"xh\", \"yi\", \"yo\", \"zh-Hans\", \"zh-Hant\", \"zu\", \n", - "]\n", - "\n", - "def CC100(root, language_code, use_caching=True):\n", - " if language_code not in VALID_CODES:\n", - " raise ValueError(f\"Invalid language code {language_code}\")\n", - " url = URL % language_code\n", - " if use_caching:\n", - " cache_compressed_dp = IterableWrapper([url]).on_disk_cache(\n", - " HttpReader, \n", - " op_map=lambda x: (x[0], x[1].read()),\n", - " filepath_fn=lambda x: os.path.join(root, os.path.basename(x)))\n", - " \n", - " cache_decompressed_dp = cache_compressed_dp.map(lambda x: (x[0])).on_disk_cache(\n", - " lambda x: FileLoader(x).read_from_xz(),\n", - " op_map=lambda x: (x[0], x[1].read()),\n", - " filepath_fn=lambda x: os.path.join(root, os.path.basename(x).rstrip(\".xz\"))) \n", - " \n", - " data_dp = cache_decompressed_dp\n", - " else:\n", - " data_dp = HttpReader([url]).read_from_xz()\n", - " units_dp = data_dp.readlines().map(lambda x: (language_code, x[1])).map(lambda x: (x[0], x[1].decode()))\n", - " return units_dp\n" - ], - "outputs": [], - "metadata": {} - }, - { - "cell_type": "code", - "execution_count": 3, - "source": [ - "# Sample from multi-gigabyte-size compressed dataset without downloading the whole thing\n", - "# This executes very fast\n", - "import time\n", - "start_time = time.time()\n", - "for i, x in enumerate(CC100(ROOT_DIR, 'en', use_caching=False)):\n", - " print(x)\n", - " if i > 5:\n", - " break\n", - "print(f\"Execution time {(time.time() - start_time):.2f} secs\")" - ], - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "('en', 'Belmont Estate is on the market for $63 million and boasts roughly 22,000 square feet of luxurious finishes and elaborate architecture on 1.28 acres. Listed on Thursday, the home is being sold by high-end real estate firm Sotheby’s International Realty Canada.')\n", - "('en', '“Within the city we’ve had homes that have sold for $56 million, $33 million, $31 million but this will be the record of the offering price,” listing agent Christa Frosch of Sotheby’s tells BuzzBuzzNews.')\n", - "('en', 'The three-storey home has five bedrooms, twelve bathrooms and an elevator in the west wing. Built to entertain, two main gallery halls can seat up to 100 guests. The Italian-inspired kitchen includes a fireplace and walls and ceilings throughout the home feature murals and artwork. Lavish amenities include an indoor pool and sauna, a six-car garage and a private entrance in-law’s suite.')\n", - "('en', 'Surrounding the property is a Versailles-inspired garden with a variety of trees, plants and an orchard. In the spring, over 12,000 flowers bloom in the tiered, three-level garden.')\n", - "('en', 'According to Frosch, the listing has received global attention and, despite being on the market for only 24 hours, buyers are already showing interest.')\n", - "('en', '“We just went to the market yesterday, it’s private through Sotheby’s and we’ve already started to get calls,” says Frosch.')\n", - "('en', '')\n", - "Execution time 0.55 secs\n" - ] - } - ], - "metadata": {} - }, - { - "cell_type": "code", - "execution_count": 5, - "source": [ - "# cache\n", - "# This cell is very slow to run the first time as it downloads a dataset from a very slow server\n", - "next(iter(CC100(ROOT_DIR, 'ha', use_caching=True)))" - ], - "outputs": [ - { - "data": { - "text/plain": "('ha',\n 'Dangantaka tsakanin kasashen Masar da Turkiya ta yi tsami a cikin yan kwanakin nan, saboda sanin iyakokin da kowanne daga cikin yake mallaka a tekun Mediterranean .')" - }, - "execution_count": 5, - "metadata": {}, - "output_type": "execute_result" - } - ], - "metadata": {} - }, - { - "cell_type": "code", - "execution_count": 5, - "source": [ - "# cache\n", - "# This cell is very slow to run the first time as it downloads a dataset from a very slow server\n", - "next(iter(CC100(ROOT_DIR, 'yi', use_caching=True)))" - ], - "outputs": [ - { - "output_type": "execute_result", - "data": { - "text/plain": [ - "('yi', 'קאַטעגאָריע:cs-- – װיקיװערטערבוך')" - ] - }, - "metadata": {}, - "execution_count": 5 - } - ], - "metadata": {} - }, - { - "cell_type": "code", - "execution_count": 6, - "source": [ - "import itertools\n", - "# Cache two of the datasets. The backend rate-limits connections to 1 per ip, \n", - "# so you can't have more than one dataset running without caching\n", - "\n", - "# If you do \"run all\" this may fail because the previous http connections might still be alive\n", - "\n", - "z1 = CC100(ROOT_DIR, 'li', use_caching=False).cycle()\n", - "z2 = CC100(ROOT_DIR, 'ha', use_caching=True).cycle()\n", - "z3 = CC100(ROOT_DIR, 'yi', use_caching=True).cycle()\n", - "\n", - "z = SampleMultiplexer({z1: 0.7, z2: 0.2, z3: 0.1})\n", - "\n", - "l = list(itertools.islice(z, 0, 500000))\n", - "print(l[0:20])\n", - "\n", - "ratio = sum(1 for k,v in l if k == 'li') / len(l)\n", - "print(f\"Expected ratio: 0.7, actual {ratio}\")\n" - ], - "outputs": [ - { - "output_type": "stream", - "name": "stdout", - "text": [ - "[('li', \"Kop van 't Ende - Wikipedia\"), ('li', ''), ('li', \"Coos is 'n in 1853 gestiech graofsjap in Oregon, VS. Coos is verneump nao de Cook-koo-oose, 'n inheims Amerikaans stam, die allewijl neet mie besteit. De hoofplaots vaan 't graofsjap is Coquille.\"), ('ha', 'Dangantaka tsakanin kasashen Masar da Turkiya ta yi tsami a cikin yan kwanakin nan, saboda sanin iyakokin da kowanne daga cikin yake mallaka a tekun Mediterranean .'), ('yi', 'קאַטעגאָריע:cs-- – װיקיװערטערבוך'), ('li', \"'t Graofsjap heet 'n totaal oppervlak vaan 4.678 km² boevaan 4.145 km² land is en 533 km² water.\"), ('ha', \"Kamfanin dillancin labaran IRNA na kasar Iran ya nakalto Ahmad Abu-Zaid kakakin ma'aikatar harkokin wajen kasar Masar yarjejeniyar da kasar Masar ta cimma da kasar Cyprus kan iyakokin da kowanne daga cikinsu yake mallaka daga gabacin tekun Mediterranean ta zama doka ce, kuma duk wanda yayi kokarin taka ta Masar zata kalubalance shi.\"), ('ha', 'Abu-Zaid ya kara da cewa yarjejeniyar rabon kan iyaka a cikin tekun Mediterranean , yarjejjeniya ce ta kasa da kasa wacce Majalisar dinkin duniya ta amince da ita.'), ('li', \"Volgens de census vaan 2000 bedroog 't totaol bevolkingsaontal in Coos County 62.779.\"), ('ha', 'Amma ministan harkokin wajen kasar Turkiya Maulud Chavis-Uglu, a ranar litinin da ta gabata ce ya bada sanarwan cewa kasar Turkiya ba ta amince da yarjejeniyar da kasashen Masar ta Cyprus suka cimma kan rabon kan iyaka da kuma amfani da tekun Mediterranean a shekara ta 2013 ba.'), ('li', \"De twie belaankriekste plaotse vaan 't graofsjap zien:\"), ('ha', 'Wani Sabon Sabani Ya Kunno kai Tsakanin Kasashen Masar Da Turkiyya'), ('li', \"Gesjreve in 't Mestreechs\"), ('li', \"Dees pazjena is 't lèts verangerd op 9 mrt 2013, 04:24.\"), ('ha', 'Masar Ta Zargi Mahukuntan Turkiyya Da Kokarin Yin Zagon Kasa Ga Harkar Tattalin Arzikin Kasarta'), ('li', ''), ('li', \"'ne Centimeter (aofkorting: cm) is geliek aon 'ne hoonderdste meter, ofwel 0,01 meter. Dit is weer geliek aon 10 millimeter. 't Voorveugsel centi is aofkomsteg vaan 't Latiense centum, wat hoonderd beteikent. In 't dageleks leve weurt de maot dèks gebruuk: me gebruuk 't veur 't mete vaan liechaamslengde, meubelaofmetinge, kleiding, enz. In technische teikeninge gebruuk me liever de millimeter.\"), ('ha', ''), ('li', \"'n Meetlint weurt dèks ouch 'ne centimeter geneump.\"), ('li', \"Gesjreve in 't Mestreechs\")]\n", - "Expected ratio: 0.7, actual 0.699058\n" - ] - } - ], - "metadata": {} - }, - { - "cell_type": "code", - "execution_count": 8, - "source": [ - "next(iter(CC100(ROOT_DIR, 'ha', use_caching=False).lines_to_paragraphs()))" - ], - "outputs": [ - { - "output_type": "execute_result", - "data": { - "text/plain": [ - "('ha',\n", - " \"Dangantaka tsakanin kasashen Masar da Turkiya ta yi tsami a cikin yan kwanakin nan, saboda sanin iyakokin da kowanne daga cikin yake mallaka a tekun Mediterranean .\\nKamfanin dillancin labaran IRNA na kasar Iran ya nakalto Ahmad Abu-Zaid kakakin ma'aikatar harkokin wajen kasar Masar yarjejeniyar da kasar Masar ta cimma da kasar Cyprus kan iyakokin da kowanne daga cikinsu yake mallaka daga gabacin tekun Mediterranean ta zama doka ce, kuma duk wanda yayi kokarin taka ta Masar zata kalubalance shi.\\nAbu-Zaid ya kara da cewa yarjejeniyar rabon kan iyaka a cikin tekun Mediterranean , yarjejjeniya ce ta kasa da kasa wacce Majalisar dinkin duniya ta amince da ita.\\nAmma ministan harkokin wajen kasar Turkiya Maulud Chavis-Uglu, a ranar litinin da ta gabata ce ya bada sanarwan cewa kasar Turkiya ba ta amince da yarjejeniyar da kasashen Masar ta Cyprus suka cimma kan rabon kan iyaka da kuma amfani da tekun Mediterranean a shekara ta 2013 ba.\\nWani Sabon Sabani Ya Kunno kai Tsakanin Kasashen Masar Da Turkiyya\\nMasar Ta Zargi Mahukuntan Turkiyya Da Kokarin Yin Zagon Kasa Ga Harkar Tattalin Arzikin Kasarta\")" - ] - }, - "metadata": {}, - "execution_count": 8 - } - ], - "metadata": {} - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.5" - } - }, - "nbformat": 4, - "nbformat_minor": 5 + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "source": [ + "import torch\n", + "import os\n", + "\n", + "from torchdata.datapipes.iter import (\n", + " FileOpener,\n", + " HttpReader,\n", + " IterableWrapper,\n", + " SampleMultiplexer,\n", + ")\n", + "\n", + "ROOT_DIR = os.path.expanduser('~/.torchdata/CC100') # This directory needs to be crated and set", + ], + "outputs": [], + "metadata": {}, + }, + { + "cell_type": "code", + "execution_count": 2, + "source": [ + "# CC100 support (http://data.statmt.org/cc-100/)\n", + "\n", + 'URL="http://data.statmt.org/cc-100/%s.txt.xz"\n', + "VALID_CODES = [\n", + ' "am", "ar", "as", "az", "be", "bg", "bn", "bn_rom", "br", "bs", "ca", "cs", "cy", "da", "de", \n', + ' "el", "en", "eo", "es", "et", "eu", "fa", "ff", "fi", "fr", "fy", "ga", "gd", "gl", "gn", "gu", \n', + ' "ha", "he", "hi", "hi_rom", "hr", "ht", "hu", "hy", "id", "ig", "is", "it", "ja", "jv", "ka", \n', + ' "kk", "km", "kn", "ko", "ku", "ky", "la", "lg", "li", "ln", "lo", "lt", "lv", "mg", "mk", "ml", \n', + ' "mn", "mr", "ms", "my", "my_zaw", "ne", "nl", "no", "ns", "om", "or", "pa", "pl", "ps", "pt", \n', + ' "qu", "rm", "ro", "ru", "sa", "si", "sc", "sd", "sk", "sl", "so", "sq", "sr", "ss", "su", "sv", \n', + ' "sw", "ta", "ta_rom", "te", "te_rom", "th", "tl", "tn", "tr", "ug", "uk", "ur", "ur_rom", "uz", \n', + ' "vi", "wo", "xh", "yi", "yo", "zh-Hans", "zh-Hant", "zu", \n', + "]\n", + "\n", + "def CC100(root, language_code, use_caching=True):\n", + " if language_code not in VALID_CODES:\n", + ' raise ValueError(f"Invalid language code {language_code}")\n', + " url = URL % language_code\n", + " if use_caching:\n", + " cache_compressed_dp = IterableWrapper([url]).on_disk_cache(\n", + " HttpReader, \n", + " op_map=lambda x: (x[0], x[1].read()),\n", + " filepath_fn=lambda x: os.path.join(root, os.path.basename(x)))\n", + " \n", + " cache_decompressed_dp = cache_compressed_dp.map(lambda x: (x[0])).on_disk_cache(\n", + " lambda x: FileOpener(x, mode='b').read_from_xz(),\n", + " op_map=lambda x: (x[0], x[1].read()),\n", + ' filepath_fn=lambda x: os.path.join(root, os.path.basename(x).rstrip(".xz"))) \n', + " \n", + " data_dp = cache_decompressed_dp\n", + " else:\n", + " data_dp = HttpReader([url]).read_from_xz()\n", + " units_dp = data_dp.readlines().map(lambda x: (language_code, x[1])).map(lambda x: (x[0], x[1].decode()))\n", + " return units_dp\n", + ], + "outputs": [], + "metadata": {}, + }, + { + "cell_type": "code", + "execution_count": 3, + "source": [ + "# Sample from multi-gigabyte-size compressed dataset without downloading the whole thing\n", + "# This executes very fast\n", + "import time\n", + "start_time = time.time()\n", + "for i, x in enumerate(CC100(ROOT_DIR, 'en', use_caching=False)):\n", + " print(x)\n", + " if i > 5:\n", + " break\n", + 'print(f"Execution time {(time.time() - start_time):.2f} secs")', + ], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "('en', 'Belmont Estate is on the market for $63 million and boasts roughly 22,000 square feet of luxurious finishes and elaborate architecture on 1.28 acres. Listed on Thursday, the home is being sold by high-end real estate firm Sotheby’s International Realty Canada.')\n", + "('en', '“Within the city we’ve had homes that have sold for $56 million, $33 million, $31 million but this will be the record of the offering price,” listing agent Christa Frosch of Sotheby’s tells BuzzBuzzNews.')\n", + "('en', 'The three-storey home has five bedrooms, twelve bathrooms and an elevator in the west wing. Built to entertain, two main gallery halls can seat up to 100 guests. The Italian-inspired kitchen includes a fireplace and walls and ceilings throughout the home feature murals and artwork. Lavish amenities include an indoor pool and sauna, a six-car garage and a private entrance in-law’s suite.')\n", + "('en', 'Surrounding the property is a Versailles-inspired garden with a variety of trees, plants and an orchard. In the spring, over 12,000 flowers bloom in the tiered, three-level garden.')\n", + "('en', 'According to Frosch, the listing has received global attention and, despite being on the market for only 24 hours, buyers are already showing interest.')\n", + "('en', '“We just went to the market yesterday, it’s private through Sotheby’s and we’ve already started to get calls,” says Frosch.')\n", + "('en', '')\n", + "Execution time 0.55 secs\n", + ], + } + ], + "metadata": {}, + }, + { + "cell_type": "code", + "execution_count": 5, + "source": [ + "# cache\n", + "# This cell is very slow to run the first time as it downloads a dataset from a very slow server\n", + "next(iter(CC100(ROOT_DIR, 'ha', use_caching=True)))", + ], + "outputs": [ + { + "data": { + "text/plain": "('ha',\n 'Dangantaka tsakanin kasashen Masar da Turkiya ta yi tsami a cikin yan kwanakin nan, saboda sanin iyakokin da kowanne daga cikin yake mallaka a tekun Mediterranean .')" + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result", + } + ], + "metadata": {}, + }, + { + "cell_type": "code", + "execution_count": 5, + "source": [ + "# cache\n", + "# This cell is very slow to run the first time as it downloads a dataset from a very slow server\n", + "next(iter(CC100(ROOT_DIR, 'yi', use_caching=True)))", + ], + "outputs": [ + { + "output_type": "execute_result", + "data": {"text/plain": ["('yi', 'קאַטעגאָריע:cs-- – װיקיװערטערבוך')"]}, + "metadata": {}, + "execution_count": 5, + } + ], + "metadata": {}, + }, + { + "cell_type": "code", + "execution_count": 6, + "source": [ + "import itertools\n", + "# Cache two of the datasets. The backend rate-limits connections to 1 per ip, \n", + "# so you can't have more than one dataset running without caching\n", + "\n", + '# If you do "run all" this may fail because the previous http connections might still be alive\n', + "\n", + "z1 = CC100(ROOT_DIR, 'li', use_caching=False).cycle()\n", + "z2 = CC100(ROOT_DIR, 'ha', use_caching=True).cycle()\n", + "z3 = CC100(ROOT_DIR, 'yi', use_caching=True).cycle()\n", + "\n", + "z = SampleMultiplexer({z1: 0.7, z2: 0.2, z3: 0.1})\n", + "\n", + "l = list(itertools.islice(z, 0, 500000))\n", + "print(l[0:20])\n", + "\n", + "ratio = sum(1 for k,v in l if k == 'li') / len(l)\n", + 'print(f"Expected ratio: 0.7, actual {ratio}")\n', + ], + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "[('li', \"Kop van 't Ende - Wikipedia\"), ('li', ''), ('li', \"Coos is 'n in 1853 gestiech graofsjap in Oregon, VS. Coos is verneump nao de Cook-koo-oose, 'n inheims Amerikaans stam, die allewijl neet mie besteit. De hoofplaots vaan 't graofsjap is Coquille.\"), ('ha', 'Dangantaka tsakanin kasashen Masar da Turkiya ta yi tsami a cikin yan kwanakin nan, saboda sanin iyakokin da kowanne daga cikin yake mallaka a tekun Mediterranean .'), ('yi', 'קאַטעגאָריע:cs-- – װיקיװערטערבוך'), ('li', \"'t Graofsjap heet 'n totaal oppervlak vaan 4.678 km² boevaan 4.145 km² land is en 533 km² water.\"), ('ha', \"Kamfanin dillancin labaran IRNA na kasar Iran ya nakalto Ahmad Abu-Zaid kakakin ma'aikatar harkokin wajen kasar Masar yarjejeniyar da kasar Masar ta cimma da kasar Cyprus kan iyakokin da kowanne daga cikinsu yake mallaka daga gabacin tekun Mediterranean ta zama doka ce, kuma duk wanda yayi kokarin taka ta Masar zata kalubalance shi.\"), ('ha', 'Abu-Zaid ya kara da cewa yarjejeniyar rabon kan iyaka a cikin tekun Mediterranean , yarjejjeniya ce ta kasa da kasa wacce Majalisar dinkin duniya ta amince da ita.'), ('li', \"Volgens de census vaan 2000 bedroog 't totaol bevolkingsaontal in Coos County 62.779.\"), ('ha', 'Amma ministan harkokin wajen kasar Turkiya Maulud Chavis-Uglu, a ranar litinin da ta gabata ce ya bada sanarwan cewa kasar Turkiya ba ta amince da yarjejeniyar da kasashen Masar ta Cyprus suka cimma kan rabon kan iyaka da kuma amfani da tekun Mediterranean a shekara ta 2013 ba.'), ('li', \"De twie belaankriekste plaotse vaan 't graofsjap zien:\"), ('ha', 'Wani Sabon Sabani Ya Kunno kai Tsakanin Kasashen Masar Da Turkiyya'), ('li', \"Gesjreve in 't Mestreechs\"), ('li', \"Dees pazjena is 't lèts verangerd op 9 mrt 2013, 04:24.\"), ('ha', 'Masar Ta Zargi Mahukuntan Turkiyya Da Kokarin Yin Zagon Kasa Ga Harkar Tattalin Arzikin Kasarta'), ('li', ''), ('li', \"'ne Centimeter (aofkorting: cm) is geliek aon 'ne hoonderdste meter, ofwel 0,01 meter. Dit is weer geliek aon 10 millimeter. 't Voorveugsel centi is aofkomsteg vaan 't Latiense centum, wat hoonderd beteikent. In 't dageleks leve weurt de maot dèks gebruuk: me gebruuk 't veur 't mete vaan liechaamslengde, meubelaofmetinge, kleiding, enz. In technische teikeninge gebruuk me liever de millimeter.\"), ('ha', ''), ('li', \"'n Meetlint weurt dèks ouch 'ne centimeter geneump.\"), ('li', \"Gesjreve in 't Mestreechs\")]\n", + "Expected ratio: 0.7, actual 0.699058\n", + ], + } + ], + "metadata": {}, + }, + { + "cell_type": "code", + "execution_count": 8, + "source": ["next(iter(CC100(ROOT_DIR, 'ha', use_caching=False).lines_to_paragraphs()))"], + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "('ha',\n", + ' "Dangantaka tsakanin kasashen Masar da Turkiya ta yi tsami a cikin yan kwanakin nan, saboda sanin iyakokin da kowanne daga cikin yake mallaka a tekun Mediterranean .\\nKamfanin dillancin labaran IRNA na kasar Iran ya nakalto Ahmad Abu-Zaid kakakin ma\'aikatar harkokin wajen kasar Masar yarjejeniyar da kasar Masar ta cimma da kasar Cyprus kan iyakokin da kowanne daga cikinsu yake mallaka daga gabacin tekun Mediterranean ta zama doka ce, kuma duk wanda yayi kokarin taka ta Masar zata kalubalance shi.\\nAbu-Zaid ya kara da cewa yarjejeniyar rabon kan iyaka a cikin tekun Mediterranean , yarjejjeniya ce ta kasa da kasa wacce Majalisar dinkin duniya ta amince da ita.\\nAmma ministan harkokin wajen kasar Turkiya Maulud Chavis-Uglu, a ranar litinin da ta gabata ce ya bada sanarwan cewa kasar Turkiya ba ta amince da yarjejeniyar da kasashen Masar ta Cyprus suka cimma kan rabon kan iyaka da kuma amfani da tekun Mediterranean a shekara ta 2013 ba.\\nWani Sabon Sabani Ya Kunno kai Tsakanin Kasashen Masar Da Turkiyya\\nMasar Ta Zargi Mahukuntan Turkiyya Da Kokarin Yin Zagon Kasa Ga Harkar Tattalin Arzikin Kasarta")', + ] + }, + "metadata": {}, + "execution_count": 8, + } + ], + "metadata": {}, + }, + ], + "metadata": { + "kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}, + "language_info": { + "codemirror_mode": {"name": "ipython", "version": 3}, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.5", + }, + }, + "nbformat": 4, + "nbformat_minor": 5, } diff --git a/examples/text/amazonreviewpolarity.py b/examples/text/amazonreviewpolarity.py index 1084fff69..bb810c167 100644 --- a/examples/text/amazonreviewpolarity.py +++ b/examples/text/amazonreviewpolarity.py @@ -1,7 +1,7 @@ # Copyright (c) Facebook, Inc. and its affiliates. import os -from torchdata.datapipes.iter import FileLoader, GDriveReader, IterableWrapper +from torchdata.datapipes.iter import FileOpener, GDriveReader, IterableWrapper from .utils import _add_docstring_header, _create_dataset_directory, _wrap_split_argument @@ -43,7 +43,7 @@ def AmazonReviewPolarity(root, split): ) cache_dp = GDriveReader(cache_dp).end_caching(mode="wb", same_filepath_fn=True) - cache_dp = FileLoader(cache_dp) + cache_dp = FileOpener(cache_dp, mode="b") # stack TAR extractor on top of loader DP extracted_files = cache_dp.read_from_tar() diff --git a/examples/text/imdb.py b/examples/text/imdb.py index 7559f6948..a3e84844f 100644 --- a/examples/text/imdb.py +++ b/examples/text/imdb.py @@ -2,7 +2,7 @@ import os from pathlib import Path -from torchdata.datapipes.iter import FileLoader, HttpReader, IterableWrapper +from torchdata.datapipes.iter import FileOpener, HttpReader, IterableWrapper from .utils import _add_docstring_header, _create_dataset_directory, _wrap_split_argument @@ -40,7 +40,7 @@ def IMDB(root, split): ) cache_dp = HttpReader(cache_dp).end_caching(mode="wb", same_filepath_fn=True) - cache_dp = FileLoader(cache_dp) + cache_dp = FileOpener(cache_dp, mode="b") # stack TAR extractor on top of load files data pipe extracted_files = cache_dp.read_from_tar() diff --git a/examples/text/squad1.py b/examples/text/squad1.py index ed16a2286..d453db220 100644 --- a/examples/text/squad1.py +++ b/examples/text/squad1.py @@ -1,7 +1,7 @@ # Copyright (c) Facebook, Inc. and its affiliates. import os -from torchdata.datapipes.iter import FileLoader, HttpReader, IterableWrapper, IterDataPipe +from torchdata.datapipes.iter import FileOpener, HttpReader, IterableWrapper, IterDataPipe from .utils import _add_docstring_header, _create_dataset_directory, _wrap_split_argument @@ -61,7 +61,7 @@ def SQuAD1(root, split): ) cache_dp = HttpReader(cache_dp).end_caching(mode="wb", same_filepath_fn=True) - cache_dp = FileLoader(cache_dp) + cache_dp = FileOpener(cache_dp, mode="b") # stack custom data pipe on top of JSON reader to orchestrate data samples for Q&A dataset return _ParseSQuADQAData(cache_dp.parse_json_files()) diff --git a/examples/text/squad2.py b/examples/text/squad2.py index d4c713645..0e3420e53 100644 --- a/examples/text/squad2.py +++ b/examples/text/squad2.py @@ -1,7 +1,7 @@ # Copyright (c) Facebook, Inc. and its affiliates. import os -from torchdata.datapipes.iter import FileLoader, HttpReader, IterableWrapper, IterDataPipe +from torchdata.datapipes.iter import FileOpener, HttpReader, IterableWrapper, IterDataPipe from .utils import _add_docstring_header, _create_dataset_directory, _wrap_split_argument @@ -61,7 +61,7 @@ def SQuAD2(root, split): ) cache_dp = HttpReader(cache_dp).end_caching(mode="wb", same_filepath_fn=True) - cache_dp = FileLoader(cache_dp) + cache_dp = FileOpener(cache_dp, mode="b") # stack custom data pipe on top of JSON reader to orchestrate data samples for Q&A dataset return _ParseSQuADQAData(cache_dp.parse_json_files()) diff --git a/examples/vision/caltech101.py b/examples/vision/caltech101.py index 31a507cd9..f7f958dc5 100644 --- a/examples/vision/caltech101.py +++ b/examples/vision/caltech101.py @@ -5,7 +5,7 @@ import torch from torch.utils.data.datapipes.utils.decoder import imagehandler, mathandler from torchdata.datapipes.iter import ( - FileLoader, + FileOpener, Filter, IterableWrapper, IterKeyZipper, @@ -89,14 +89,14 @@ def collate_sample(data): def Caltech101(root=ROOT): anns_dp = IterableWrapper([os.path.join(root, "Annotations.tar")]) - anns_dp = FileLoader(anns_dp) + anns_dp = FileOpener(anns_dp, mode="b") anns_dp = TarArchiveReader(anns_dp) anns_dp = Filter(anns_dp, is_ann) anns_dp = RoutedDecoder(anns_dp, mathandler()) anns_dp = Mapper(anns_dp, collate_ann) images_dp = IterableWrapper([os.path.join(root, "101_ObjectCategories.tar.gz")]) - images_dp = FileLoader(images_dp) + images_dp = FileOpener(images_dp, mode="b") images_dp = TarArchiveReader(images_dp) images_dp = Filter(images_dp, is_not_background_image) images_dp = Filter(images_dp, is_not_rogue_image) diff --git a/examples/vision/caltech256.py b/examples/vision/caltech256.py index 08e3c67a8..158bed25a 100644 --- a/examples/vision/caltech256.py +++ b/examples/vision/caltech256.py @@ -3,7 +3,7 @@ from torch.utils.data.datapipes.utils.decoder import imagehandler -from torchdata.datapipes.iter import FileLoader, IterableWrapper, Mapper, RoutedDecoder, TarArchiveReader +from torchdata.datapipes.iter import FileOpener, IterableWrapper, Mapper, RoutedDecoder, TarArchiveReader # Download size is ~1.2 GB so fake data is provided @@ -23,7 +23,7 @@ def collate_sample(data): def Caltech256(root=ROOT): dp = IterableWrapper([os.path.join(root, "256_ObjectCategories.tar")]) - dp = FileLoader(dp) + dp = FileOpener(dp, mode="b") dp = TarArchiveReader(dp) dp = RoutedDecoder(dp, imagehandler("pil")) return Mapper(dp, collate_sample) diff --git a/test/test_local_io.py b/test/test_local_io.py index dbfbbfb01..9fcef2b74 100644 --- a/test/test_local_io.py +++ b/test/test_local_io.py @@ -19,7 +19,7 @@ CSVParser, Extractor, FileLister, - FileLoader, + FileOpener, HashChecker, IoPathFileLister, IoPathFileLoader, @@ -104,7 +104,7 @@ def make_path(fname): csv_files = {"1.csv": "key,item\na,1\nb,2", "empty.csv": "", "empty2.csv": "\n"} self._custom_files_set_up(csv_files) datapipe1 = IterableWrapper([make_path(fname) for fname in ["1.csv", "empty.csv", "empty2.csv"]]) - datapipe2 = FileLoader(datapipe1) + datapipe2 = FileOpener(datapipe1, mode="b") datapipe3 = datapipe2.map(get_name) # Functional Test: yield one row at time from each file, skipping over empty content @@ -140,7 +140,7 @@ def get_name(path_and_stream): csv_files = {"1.csv": "key,item\na,1\nb,2", "empty.csv": "", "empty2.csv": "\n"} self._custom_files_set_up(csv_files) datapipe1 = FileLister(self.temp_dir.name, "*.csv") - datapipe2 = FileLoader(datapipe1) + datapipe2 = FileOpener(datapipe1, mode="b") datapipe3 = datapipe2.map(get_name) # Functional Test: yield one row at a time as dict, with the first row being the header (key) @@ -184,7 +184,7 @@ def fill_hash_dict(): fill_hash_dict() datapipe1 = FileLister(self.temp_dir.name, "*") - datapipe2 = FileLoader(datapipe1) + datapipe2 = FileOpener(datapipe1, mode="b") hash_check_dp = HashChecker(datapipe2, hash_dict) # Functional Test: Ensure the DataPipe values are unchanged if the hashes are the same @@ -223,7 +223,7 @@ def fill_hash_dict(): self.assertEqual(expected_stream.read(), actual_stream.read()) # __len__ Test: returns the length of source DataPipe - with self.assertRaisesRegex(TypeError, "FileLoaderIterDataPipe instance doesn't have valid length"): + with self.assertRaisesRegex(TypeError, "FileOpenerIterDataPipe instance doesn't have valid length"): len(hash_check_dp) def test_json_parser_iterdatapipe(self): @@ -240,7 +240,7 @@ def is_nonempty_json(path_and_stream): } self._custom_files_set_up(json_files) datapipe1 = IterableWrapper([f"{self.temp_dir.name}/{fname}" for fname in ["empty.json", "1.json", "2.json"]]) - datapipe2 = FileLoader(datapipe1) + datapipe2 = FileOpener(datapipe1, mode="b") datapipe3 = datapipe2.map(get_name) datapipe_empty = datapipe3.filter(is_empty_json) datapipe_nonempty = datapipe3.filter(is_nonempty_json) @@ -317,12 +317,12 @@ def _write_test_tar_gz_files(self): def test_tar_archive_reader_iterdatapipe(self): self._write_test_tar_files() datapipe1 = FileLister(self.temp_dir.name, "*.tar") - datapipe2 = FileLoader(datapipe1) + datapipe2 = FileOpener(datapipe1, mode="b") tar_reader_dp = TarArchiveReader(datapipe2) self._write_test_tar_gz_files() datapipe_gz_1 = FileLister(self.temp_dir.name, "*.tar.gz") - datapipe_gz_2 = FileLoader(datapipe_gz_1) + datapipe_gz_2 = FileOpener(datapipe_gz_1, mode="b") gz_reader_dp = TarArchiveReader(datapipe_gz_2) # Functional Test: Read extracted files before reaching the end of the tarfile @@ -358,7 +358,7 @@ def _write_test_zip_files(self): def test_zip_archive_reader_iterdatapipe(self): self._write_test_zip_files() datapipe1 = FileLister(self.temp_dir.name, "*.zip") - datapipe2 = FileLoader(datapipe1) + datapipe2 = FileOpener(datapipe1, mode="b") zip_reader_dp = ZipArchiveReader(datapipe2) # Functional Test: read extracted files before reaching the end of the zipfile @@ -394,7 +394,7 @@ def test_xz_archive_reader_iterdatapipe(self): # Whereas we create multiple .xz files in the same directories below. self._write_test_xz_files() datapipe1 = FileLister(self.temp_dir.name, "*.xz") - datapipe2 = FileLoader(datapipe1) + datapipe2 = FileOpener(datapipe1, mode="b") xz_reader_dp = XzFileReader(datapipe2) # Functional Test: Read extracted files before reaching the end of the xzfile @@ -453,19 +453,19 @@ def test_extractor_iterdatapipe(self): # Functional Test: work with .tar files tar_file_dp = FileLister(self.temp_dir.name, "*.tar") - tar_load_dp = FileLoader(tar_file_dp) + tar_load_dp = FileOpener(tar_file_dp, mode="b") tar_extract_dp = Extractor(tar_load_dp, file_type="tar") self._extractor_tar_test_helper(self.temp_files, tar_extract_dp) # Functional test: work with .tar.gz files tar_gz_file_dp = FileLister(self.temp_dir.name, "*.tar.gz") - tar_gz_load_dp = FileLoader(tar_gz_file_dp) + tar_gz_load_dp = FileOpener(tar_gz_file_dp, mode="b") tar_gz_extract_dp = Extractor(tar_gz_load_dp, file_type="tar") self._extractor_tar_test_helper(self.temp_files, tar_gz_extract_dp) # Functional Test: work with .gz files gz_file_dp = IterableWrapper([f"{self.temp_dir.name}/temp.gz"]) - gz_load_dp = FileLoader(gz_file_dp) + gz_load_dp = FileOpener(gz_file_dp, mode="b") gz_extract_dp = Extractor(gz_load_dp, file_type="gzip") for _, gz_stream in gz_extract_dp: with open(self.temp_files[0], "rb") as f: @@ -473,7 +473,7 @@ def test_extractor_iterdatapipe(self): # Functional Test: work with .zip files zip_file_dp = FileLister(self.temp_dir.name, "*.zip") - zip_load_dp = FileLoader(zip_file_dp) + zip_load_dp = FileOpener(zip_file_dp, mode="b") zip_extract_dp = zip_load_dp.extract(file_type="zip") for _, zip_stream in zip_extract_dp: for fname in self.temp_files: @@ -482,7 +482,7 @@ def test_extractor_iterdatapipe(self): # Functional Test: work with .xz files xz_file_dp = FileLister(self.temp_dir.name, "*.xz") - xz_load_dp = FileLoader(xz_file_dp) + xz_load_dp = FileOpener(xz_file_dp, mode="b") xz_extract_dp = Extractor(xz_load_dp, file_type="lzma") self._extractor_xz_test_helper(xz_extract_dp) @@ -599,7 +599,7 @@ def test_rar_archive_loader(self): self._write_test_rar_files() datapipe1 = FileLister(self.temp_dir.name, "*.rar") - datapipe2 = FileLoader(datapipe1) + datapipe2 = FileOpener(datapipe1, mode="b") rar_loader_dp = RarArchiveLoader(datapipe2) # Functional Test: read extracted files before reaching the end of the rarfile diff --git a/test/test_remote_io.py b/test/test_remote_io.py index 25407dc59..d76ae20bc 100644 --- a/test/test_remote_io.py +++ b/test/test_remote_io.py @@ -8,7 +8,7 @@ from _utils._common_utils_for_test import check_hash_fn, create_temp_dir -from torchdata.datapipes.iter import EndOnDiskCacheHolder, FileLoader, HttpReader, IterableWrapper, OnDiskCacheHolder +from torchdata.datapipes.iter import EndOnDiskCacheHolder, FileOpener, HttpReader, IterableWrapper, OnDiskCacheHolder class TestDataPipeRemoteIO(expecttest.TestCase): @@ -110,7 +110,7 @@ def _gen_filepath_fn(tar_path): # DataPipe Constructor file_cache_dp = OnDiskCacheHolder(tar_cache_dp, filepath_fn=_gen_filepath_fn) - file_cache_dp = FileLoader(file_cache_dp, mode="rb") + file_cache_dp = FileOpener(file_cache_dp, mode="rb") # Functional API file_cache_dp = file_cache_dp.read_from_tar() diff --git a/torchdata/datapipes/iter/__init__.py b/torchdata/datapipes/iter/__init__.py index f1c838a3a..90c6095a2 100644 --- a/torchdata/datapipes/iter/__init__.py +++ b/torchdata/datapipes/iter/__init__.py @@ -7,6 +7,7 @@ Demultiplexer, FileLister, FileLoader, + FileOpener, Filter, Forker, Grouper, @@ -94,6 +95,7 @@ "FSSpecSaver", "FileLister", "FileLoader", + "FileOpener", "Filter", "Forker", "GDriveReader",