admin管理员组

文章数量:1023112

We want to copy a large Spark dataframe into Oracle, but I am finding the tuning options a bit limited. Looking at Spark documentation, the only related tuning property I could find for a JDBC write is numPartitions.

However, the dataframe we want to write is 700,000,000 records and Oracle is only 32 cores, so I don't want to overload the database with too many threads. My understanding is if I set numPartitions to 32, it will effectively do a .repartition(32) on the large dataset then write each partition into Oracle. 32 partitions in Spark is not enough and will cause memory issues.

Is there a way to break the job into further pieces so it doesn't try to do everything at once, but instead does 50,000,000 (or something) at a time?

I was thinking of something like this, but I was hoping there's something more efficient:

// Imagine "df" is the incoming dataframe we want to write.

val threads = 32
val recordsPerThread = 500000
val chunkSize = threads * recordsPerThread
val total = df.count
val chunks = (total/chunkSize).ceil.toInt

val chunkDf = df.withColumn("CHUNK_NUM", rand.multiply(chunks).cast(IntegerType))

for (chunkNum <- 0 to chunks) {
  chunkDf.filter(s"CHUNK_NUM = ${chunkNum}")
    .drop("CHUNK_NUM")
    .write
    .format("jdbc")
    .options(...) // DB info + numPartitions = 32
    .save
}

Basically, I'm dividing the dataset into "chunks" which can all be written at once with 32 threads (numPartitions). I feel like there should be a more efficient way of doing this, but I can't seem to find it in documentation.

I'm also using batchSize set to 10000 to reduce round trips, but I'm still limited to how many threads I want making round trips to Oracle, and how large partitions in Spark can be.

We want to copy a large Spark dataframe into Oracle, but I am finding the tuning options a bit limited. Looking at Spark documentation, the only related tuning property I could find for a JDBC write is numPartitions.

However, the dataframe we want to write is 700,000,000 records and Oracle is only 32 cores, so I don't want to overload the database with too many threads. My understanding is if I set numPartitions to 32, it will effectively do a .repartition(32) on the large dataset then write each partition into Oracle. 32 partitions in Spark is not enough and will cause memory issues.

Is there a way to break the job into further pieces so it doesn't try to do everything at once, but instead does 50,000,000 (or something) at a time?

I was thinking of something like this, but I was hoping there's something more efficient:

// Imagine "df" is the incoming dataframe we want to write.

val threads = 32
val recordsPerThread = 500000
val chunkSize = threads * recordsPerThread
val total = df.count
val chunks = (total/chunkSize).ceil.toInt

val chunkDf = df.withColumn("CHUNK_NUM", rand.multiply(chunks).cast(IntegerType))

for (chunkNum <- 0 to chunks) {
  chunkDf.filter(s"CHUNK_NUM = ${chunkNum}")
    .drop("CHUNK_NUM")
    .write
    .format("jdbc")
    .options(...) // DB info + numPartitions = 32
    .save
}

Basically, I'm dividing the dataset into "chunks" which can all be written at once with 32 threads (numPartitions). I feel like there should be a more efficient way of doing this, but I can't seem to find it in documentation.

I'm also using batchSize set to 10000 to reduce round trips, but I'm still limited to how many threads I want making round trips to Oracle, and how large partitions in Spark can be.

Share Improve this question asked Nov 18, 2024 at 21:03 DepressioDepressio 1,3792 gold badges21 silver badges43 bronze badges
Add a comment  | 

2 Answers 2

Reset to default 1

I was overthinking it. We can constrain how much Spark is writing at once by simply constraining the resources we give Spark. If I set numPartitions to 500 but only give Spark a single 32-core worker, it will only write 32 partitions at a time, limiting how much we're hammering Oracle. Thus effectively "chunks" the job.

In your case i would simply repartition

val threads = 32

df.repartition(threads).write.format("jdbc").options(...).save()

We want to copy a large Spark dataframe into Oracle, but I am finding the tuning options a bit limited. Looking at Spark documentation, the only related tuning property I could find for a JDBC write is numPartitions.

However, the dataframe we want to write is 700,000,000 records and Oracle is only 32 cores, so I don't want to overload the database with too many threads. My understanding is if I set numPartitions to 32, it will effectively do a .repartition(32) on the large dataset then write each partition into Oracle. 32 partitions in Spark is not enough and will cause memory issues.

Is there a way to break the job into further pieces so it doesn't try to do everything at once, but instead does 50,000,000 (or something) at a time?

I was thinking of something like this, but I was hoping there's something more efficient:

// Imagine "df" is the incoming dataframe we want to write.

val threads = 32
val recordsPerThread = 500000
val chunkSize = threads * recordsPerThread
val total = df.count
val chunks = (total/chunkSize).ceil.toInt

val chunkDf = df.withColumn("CHUNK_NUM", rand.multiply(chunks).cast(IntegerType))

for (chunkNum <- 0 to chunks) {
  chunkDf.filter(s"CHUNK_NUM = ${chunkNum}")
    .drop("CHUNK_NUM")
    .write
    .format("jdbc")
    .options(...) // DB info + numPartitions = 32
    .save
}

Basically, I'm dividing the dataset into "chunks" which can all be written at once with 32 threads (numPartitions). I feel like there should be a more efficient way of doing this, but I can't seem to find it in documentation.

I'm also using batchSize set to 10000 to reduce round trips, but I'm still limited to how many threads I want making round trips to Oracle, and how large partitions in Spark can be.

We want to copy a large Spark dataframe into Oracle, but I am finding the tuning options a bit limited. Looking at Spark documentation, the only related tuning property I could find for a JDBC write is numPartitions.

However, the dataframe we want to write is 700,000,000 records and Oracle is only 32 cores, so I don't want to overload the database with too many threads. My understanding is if I set numPartitions to 32, it will effectively do a .repartition(32) on the large dataset then write each partition into Oracle. 32 partitions in Spark is not enough and will cause memory issues.

Is there a way to break the job into further pieces so it doesn't try to do everything at once, but instead does 50,000,000 (or something) at a time?

I was thinking of something like this, but I was hoping there's something more efficient:

// Imagine "df" is the incoming dataframe we want to write.

val threads = 32
val recordsPerThread = 500000
val chunkSize = threads * recordsPerThread
val total = df.count
val chunks = (total/chunkSize).ceil.toInt

val chunkDf = df.withColumn("CHUNK_NUM", rand.multiply(chunks).cast(IntegerType))

for (chunkNum <- 0 to chunks) {
  chunkDf.filter(s"CHUNK_NUM = ${chunkNum}")
    .drop("CHUNK_NUM")
    .write
    .format("jdbc")
    .options(...) // DB info + numPartitions = 32
    .save
}

Basically, I'm dividing the dataset into "chunks" which can all be written at once with 32 threads (numPartitions). I feel like there should be a more efficient way of doing this, but I can't seem to find it in documentation.

I'm also using batchSize set to 10000 to reduce round trips, but I'm still limited to how many threads I want making round trips to Oracle, and how large partitions in Spark can be.

Share Improve this question asked Nov 18, 2024 at 21:03 DepressioDepressio 1,3792 gold badges21 silver badges43 bronze badges
Add a comment  | 

2 Answers 2

Reset to default 1

I was overthinking it. We can constrain how much Spark is writing at once by simply constraining the resources we give Spark. If I set numPartitions to 500 but only give Spark a single 32-core worker, it will only write 32 partitions at a time, limiting how much we're hammering Oracle. Thus effectively "chunks" the job.

In your case i would simply repartition

val threads = 32

df.repartition(threads).write.format("jdbc").options(...).save()

本文标签: scalaBreaking up a large JDBC write with SparkStack Overflow