-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-28120][SS] Rocksdb state storage implementation #24922
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f59d881
292befe
05568ee
3f5f6b2
f0f2f8d
827ace4
db1ed2b
3ad88eb
41a688d
a484d63
35de011
f983d78
d41560a
ebaea37
818f716
603958b
562f755
4f42068
7d4d5d1
0b129f3
c38bd6c
fcf2a86
fb86f0d
4544abc
8f50519
b3ef8ea
417de56
45e0d05
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.io; | ||
|
||
import org.apache.commons.compress.archivers.tar.TarArchiveEntry; | ||
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; | ||
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; | ||
import org.apache.commons.compress.utils.IOUtils; | ||
|
||
import java.io.*; | ||
|
||
public class FileUtility { | ||
itsvikramagr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/** | ||
* Extract an input tar file into an output files and directories. | ||
* inputTarFileLoc: the input file location for the tar file | ||
* destDirLoc: destination for the extracted files | ||
* | ||
* throws IllegalStateException | ||
*/ | ||
public static final String ENCODING = "utf-8"; | ||
|
||
public static void extractTarFile(String inputTarFileLoc, String destDirLoc) | ||
itsvikramagr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
throws IllegalStateException { | ||
File inputFile = new File(inputTarFileLoc); | ||
if (!inputTarFileLoc.endsWith(".tar")) { | ||
throw new IllegalStateException(String.format( | ||
"Input File[%s] should end with tar extension.", inputTarFileLoc)); | ||
} | ||
File destDir = new File(destDirLoc); | ||
if (destDir.exists() && !destDir.delete()) { | ||
throw new IllegalStateException(String.format( | ||
"Couldn't delete the existing destination directory[%s] ", destDirLoc)); | ||
} else if (!destDir.mkdir()) { | ||
throw new IllegalStateException(String.format( | ||
"Couldn't create directory %s ", destDirLoc)); | ||
} | ||
|
||
try (InputStream is = new FileInputStream(inputFile); | ||
TarArchiveInputStream debInputStream = new TarArchiveInputStream(is, ENCODING)) { | ||
TarArchiveEntry entry; | ||
while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) { | ||
final File outputFile = new File(destDirLoc, entry.getName()); | ||
if (entry.isDirectory()) { | ||
if (!outputFile.exists() && !outputFile.mkdirs()) { | ||
throw new IllegalStateException(String.format( | ||
"Couldn't create directory %s.", outputFile.getAbsolutePath())); | ||
} | ||
} else { | ||
try (OutputStream outputFileStream = new FileOutputStream(outputFile)) { | ||
IOUtils.copy(debInputStream, outputFileStream); | ||
} | ||
} | ||
} | ||
} catch (IOException e){ | ||
throw new IllegalStateException(String.format( | ||
"extractTarFile failed with exception %s.", e.getMessage())); | ||
} | ||
} | ||
|
||
/** | ||
* create a tar file for input source directory location . | ||
* source: the source directory location | ||
* destFileLoc: destination of the created tarball | ||
* | ||
* throws IllegalStateException | ||
*/ | ||
|
||
public static void createTarFile(String source, String destFileLoc) | ||
itsvikramagr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
throws IllegalStateException { | ||
File f = new File(destFileLoc); | ||
if (f.exists() && !f.delete()) { | ||
throw new IllegalStateException(String.format( | ||
"Couldn't delete the destination file location[%s]", destFileLoc)); | ||
} | ||
File folder = new File(source); | ||
if (!folder.exists()) { | ||
throw new IllegalStateException(String.format( | ||
"Source folder[%s] does not exist", source)); | ||
} | ||
|
||
try (FileOutputStream fos = new FileOutputStream(destFileLoc); | ||
TarArchiveOutputStream tarOs = new TarArchiveOutputStream(fos, ENCODING)) { | ||
File[] fileNames = folder.listFiles(); | ||
for (File file : fileNames) { | ||
TarArchiveEntry tar_file = new TarArchiveEntry(file.getName()); | ||
tar_file.setSize(file.length()); | ||
tarOs.putArchiveEntry(tar_file); | ||
try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file))) { | ||
IOUtils.copy(bis, tarOs); | ||
tarOs.closeArchiveEntry(); | ||
} | ||
} | ||
tarOs.finish(); | ||
} catch (IOException e) { | ||
throw new IllegalStateException(String.format( | ||
"createTarFile failed with exception %s.", e.getMessage())); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.io; | ||
|
||
import org.apache.commons.io.FileUtils; | ||
import org.apache.commons.lang3.RandomUtils; | ||
import org.apache.spark.util.Utils; | ||
import org.junit.After; | ||
import org.junit.Assert; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
|
||
import java.io.File; | ||
import java.io.IOException; | ||
|
||
/** | ||
* Tests functionality of {@link FileUtility} | ||
*/ | ||
public class FileUtilitySuite { | ||
|
||
protected File sourceFolder; | ||
protected File destTarLoc; | ||
protected File destFolder; | ||
|
||
@Before | ||
public void setUp() throws IOException { | ||
String tmpDir = System.getProperty("java.io.tmpdir"); | ||
sourceFolder = Utils.createTempDir(tmpDir, "FileUtilTest-src-" + RandomUtils.nextLong()); | ||
destFolder = Utils.createTempDir(tmpDir, "FileUtilTest-dest-" + RandomUtils.nextLong()); | ||
destTarLoc= File.createTempFile("dest-tar", ".tar"); | ||
} | ||
|
||
@After | ||
public void tearDown() { | ||
destTarLoc.delete(); | ||
} | ||
|
||
@Test | ||
public void testCreationAndExtraction() throws IllegalStateException, IOException { | ||
// Create a temp file in the source folder | ||
Assert.assertEquals(sourceFolder.listFiles().length , 0); | ||
File inputFile = File.createTempFile("source-file", ".tmp", sourceFolder); | ||
// Create a byte array of size 1 KB with random bytes | ||
byte[] randomBytes = RandomUtils.nextBytes(1 * 1024); | ||
FileUtils.writeByteArrayToFile(inputFile, randomBytes); | ||
|
||
// Create the tarball | ||
destTarLoc.delete(); | ||
Assert.assertFalse(destTarLoc.exists()); | ||
FileUtility.createTarFile(sourceFolder.toString(), destTarLoc.getAbsolutePath()); | ||
Assert.assertTrue(destTarLoc.exists()); | ||
|
||
// Extract the tarball | ||
Assert.assertEquals(destFolder.listFiles().length , 0); | ||
FileUtility.extractTarFile(destTarLoc.getAbsolutePath(), destFolder.getAbsolutePath()); | ||
|
||
// Verify that the extraction was successful | ||
Assert.assertTrue(destFolder.exists()); | ||
Assert.assertEquals(destFolder.listFiles().length , 1); | ||
Assert.assertArrayEquals(randomBytes, FileUtils.readFileToByteArray(destFolder.listFiles()[0])); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -147,6 +147,11 @@ | |
<artifactId>mockito-core</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.rocksdb</groupId> | ||
<artifactId>rocksdbjni</artifactId> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This dependency has all the files packed for all major OSs. Flink uses a custom build . Digging into this a bit more I see some additions modifications as described here. I understand this is flink specific but how about the TTL thing mentioned there, https://issues.apache.org/jira/browse/FLINK-10471 looks interesting. Structured Streaming fetches all state here (memory) and filters out the timed out ones, is RockDB performing well there? Shouldnt we have the same mechanism or a similar one so we dont fectch everything and delegate this to state backend (which could run in the background btw)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will take a look at the Flink build and see if I can pick only relevant packages in rocksdb dependency. IMO, abstracting out how state backend should filter out timed out states can be treated as a separate problem so that we don't end up increasing the scope of this PR. Once the abstraction is added, we can file a separate jira to implement it for rocksdb state backend. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok makes sense. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RocksDB might not be the best backend. Instead of adding the extra dependency, I think we should just do it as a separate third-party package. The community can always build their own backend based on their needs. Doing it is simple. Can you submit it to https://spark-packages.org/? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gatorsmile - what are the alternatives if rocksdb is not the best backend. Other streaming technologies such as flink and kstreams are using rocksdb as primary storage engine. With integration in spark codebase, we can probably change the code in any way later, but if we take the separate jar route, the kind of extensions you can make are limited by the current contract. For example @skonto mentioned one of way where we can abstract state storage implementation to get the best out of rocksdb. How can we support such improvement of we take spark package route? Current implementation based on in memory hashmap is not scalable beyond a point. How shall we go about solving it? |
||
<version>${rocksdb.version}</version> | ||
itsvikramagr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
</dependency> | ||
</dependencies> | ||
<build> | ||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
|
Uh oh!
There was an error while loading. Please reload this page.