Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import za.co.absa.cobrix.cobol.processor.{CobolProcessor, RawRecordProcessor}
import za.co.absa.cobrix.cobol.reader.VarLenNestedReader
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
import za.co.absa.cobrix.cobol.reader.stream.{FSStream, SimpleStream}

import java.io.OutputStream

Expand All @@ -47,12 +47,25 @@ class CobolProcessorInPlace(readerParameters: ReaderParameters,
val recordExtractor = CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, inputStream, None)

try {
StreamProcessor.processStreamInPlace(copybook,
inputStream match {
case stream: FSStream =>
outputStream.write(stream.getSkippedStartBytes)
case _ =>
}

val recordsProcessed = StreamProcessor.processStreamInPlace(copybook,
options,
inputStream,
recordExtractor,
rawRecordProcessor,
outputStream)

inputStream match {
case stream: FSStream =>
outputStream.write(stream.getSkippedEndBytes)
case _ =>
}
recordsProcessed
} finally {
inputStream.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon
private val fileSize: Long = new File(fileName).length()
private val effectiveSize: Long = math.max(0L, fileSize - fileStartOffset - fileEndOffset)
private var byteIndex = 0L

// Skip the start offset if specified
if (fileStartOffset > 0) {
skipFully(fileStartOffset)
}
private var skipped: Boolean = false

override def size: Long = effectiveSize

Expand All @@ -39,13 +35,50 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon

override def inputFileName: String = fileName

def getSkippedStartBytes: Array[Byte] = {
if (skipped || fileStartOffset <= 0)
Array.empty[Byte]
else {
skipped = true
val b = new Array[Byte](fileStartOffset.toInt)
val actual = bytesStream.read(b, 0, fileStartOffset.toInt)
if (actual <= 0) {
Array.empty[Byte]
} else {
b.take(actual)
}
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def getSkippedEndBytes: Array[Byte] = {
if (byteIndex >= effectiveSize && !isClosed) {
val b = new Array[Byte](fileEndOffset.toInt)
val actual = bytesStream.read(b, 0, fileEndOffset.toInt)
if (actual <= 0) {
Array.empty[Byte]
} else {
b.take(actual)
}
} else {
close()
Array.empty[Byte]
}
}


@throws(classOf[IOException])
override def next(numberOfBytes: Int): Array[Byte] = {
if (numberOfBytes <= 0) throw new IllegalArgumentException("Value of numberOfBytes should be greater than zero.")

if (!skipped && fileStartOffset > 0) {
// Skip the start offset if specified
skipFully(fileStartOffset)
}

// Check if we've reached the effective end of the stream
if (byteIndex >= effectiveSize) {
close()
if (fileEndOffset <= 0)
close()
return new Array[Byte](0)
}

Expand Down Expand Up @@ -77,6 +110,7 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon
}

private def skipFully(bytesToSkip: Long): Unit = {
skipped = true
var remaining = math.min(bytesToSkip, fileSize)
while (remaining > 0) {
val skipped = bytesStream.skip(remaining)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class CobolProcessorBuilderSuite extends AnyWordSpec with BinaryFileFixture {

assert(count == 4)
assert(outputData.sameElements(
Array(-16,-15,-14,-13).map(_.toByte)
Array(7, 7, 7, -16,-15,-14,-13, 8, 8).map(_.toByte)
))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,13 @@ object SparkCobolProcessor {

val readerParameters = cobolProcessorBuilder.getReaderParameters
val cobolProcessor = cobolProcessorBuilder.build()
val retainStartAndEndOffsets = cobolProcessingStrategy == CobolProcessingStrategy.InPlace

val processor = new SparkCobolProcessor {
private val sconf = new SerializableConfiguration(spark.sparkContext.hadoopConfiguration)

override def process(listOfFiles: Seq[String], outputPath: String): Long = {
getFileProcessorRdd(listOfFiles, outputPath, cobolProcessor, readerParameters, rawRecordProcessorOpt.get, sconf, numberOfThreads)
getFileProcessorRdd(listOfFiles, outputPath, cobolProcessor, readerParameters, rawRecordProcessorOpt.get, sconf, retainStartAndEndOffsets, numberOfThreads)
.reduce(_ + _)
}
}
Expand Down Expand Up @@ -190,12 +191,13 @@ object SparkCobolProcessor {
readerParameters: ReaderParameters,
rawRecordProcessor: SerializableRawRecordProcessor,
sconf: SerializableConfiguration,
retainStartAndEndOffsets: Boolean,
numberOfThreads: Int
)(implicit spark: SparkSession): RDD[Long] = {
val groupedFiles = listOfFiles.grouped(numberOfThreads).toSeq
val rdd = spark.sparkContext.parallelize(groupedFiles)
rdd.map(group => {
processListOfFiles(group, outputPath, cobolProcessor, readerParameters, rawRecordProcessor, sconf, numberOfThreads)
processListOfFiles(group, outputPath, cobolProcessor, readerParameters, rawRecordProcessor, sconf, retainStartAndEndOffsets, numberOfThreads)
})
}

Expand Down Expand Up @@ -254,6 +256,7 @@ object SparkCobolProcessor {
readerParameters: ReaderParameters,
rawRecordProcessor: SerializableRawRecordProcessor,
sconf: SerializableConfiguration,
retainStartAndEndOffsets: Boolean,
numberOfThreads: Int
): Long = {
val threadPool: ExecutorService = Executors.newFixedThreadPool(numberOfThreads)
Expand Down Expand Up @@ -281,7 +284,18 @@ object SparkCobolProcessor {

val recordCount = UsingUtils.using(new FileStreamer(inputFile, sconf.value, fileStartOffset, maximumBytes)) { ifs =>
UsingUtils.using(new BufferedOutputStream(outputFs.create(outputFile, true))) { ofs =>
cobolProcessor.process(ifs, ofs)(rawRecordProcessor)
if (fileStartOffset > 0 && retainStartAndEndOffsets) {
val tempStream = new FileStreamer(inputFile, sconf.value)
ofs.write(tempStream.next(fileStartOffset))
tempStream.close()
}
val recordsProcessed = cobolProcessor.process(ifs, ofs)(rawRecordProcessor)
if (fileEndOffset > 0 && retainStartAndEndOffsets) {
val tempStream = new FileStreamer(inputFile, sconf.value, maximumBytes + fileStartOffset)
ofs.write(tempStream.next(fileEndOffset))
tempStream.close()
}
recordsProcessed
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import java.io.IOException
* @note This class is not thread-safe and should only be accessed from a single thread
*/
class FileStreamer(filePath: String, hadoopConfig: Configuration, startOffset: Long = 0L, maximumBytes: Long = 0L) extends SimpleStream {

private val logger: Logger = LoggerFactory.getLogger(this.getClass)

private val hadoopPath = new Path(filePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,15 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
val outputData = readBinaryFile(outputFile)

assert(outputData.sameElements(
Array(-16, -15, -14, -13).map(_.toByte)
Array(7, 7, 7, -16, -15, -14, -13, 8, 8).map(_.toByte)
))

val actual = spark.read
.format("cobol")
.option("copybook_contents", copybook)
.option("record_format", "F")
.option("file_start_offset", "3")
.option("file_end_offset", "2")
.option("pedantic", "true")
.load(outputFile)
.toJSON
Expand Down
Loading