admin管理员组文章数量:1026640
Spark doesn't find one column while njust prints it in the schema:
import .apache.spark.sql.Dataset;
import .apache.spark.sql.Row;
import .apache.spark.sql.SparkSession;
import .jfree.chart.ChartPanel;
import .jfree.chart.JFreeChart;
import .jfree.chart.axis.AxisLocation;
import .jfree.chart.axis.LogarithmicAxis;
import .jfree.chart.axis.NumberAxis;
import .jfree.chart.axis.NumberTickUnit;
import .jfree.chart.labels.StandardXYToolTipGenerator;
import .jfree.chart.plot.XYPlot;
import .jfree.chart.renderer.PaintScale;
import .jfree.chart.renderer.xy.XYBlockRenderer;
import .jfree.chart.title.PaintScaleLegend;
import .jfree.chart.ui.RectangleEdge;
import .jfree.chart.ui.RectangleInsets;
import .jfree.data.xy.DefaultXYZDataset;
import .jfree.data.xy.XYZDataset;
import javax.imageio.ImageIO;
import javax.swing.*;
import java.awt.*;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.*;
import static .apache.spark.sql.functions.*;
public class GraphDrawer3 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Graph Drawer")
.master("local[*]")
.getOrCreate();
String logFilePath = "D:\\filtered_logs_with_useragent";
Dataset<Row> logs = spark.read()
.option("delimiter", "\t")
.option("header", "true")
.csv(logFilePath);
// Checking coulmn name
logs.printSchema();
logs.show(5, false);
logs = logs//.
//withColumn("timestamp_long", col("timestamp").cast("long"))
.withColumn("event_time", from_unixtime(col("timestamp"))) //<-- fail here
.withColumn("time_window", unix_timestamp(window(col("event_time"), "1 minutes").getField("start")))
.withColumn("duration", col("duration").cast("double"))
.withColumn("out_bytes", col("out_bytes").cast("double"))
.na().fill(0, new String[]{"duration"})
.withColumn("duration_bucket", floor(col("out_bytes").divide(1000)).multiply(1000));
Dataset<Row> graphData = logs
.groupBy("time_window", "duration_bucket")
.agg(count("*").alias("request_count"))
.orderBy("time_window", "duration_bucket");
long count = graphData.count();
if (count == 0) {
System.out.println("❌ No data!");
return;
}
graphData.show(50, false);
DefaultXYZDataset dataset = new DefaultXYZDataset();
double[] xValues, yValues, zValues;
double maxRequests;
Object[] dataArrays = prepareData(graphData);
xValues = (double[]) dataArrays[0];
yValues = (double[]) dataArrays[1];
zValues = (double[]) dataArrays[2];
maxRequests = (double) dataArrays[3];
dataset.addSeries("Heatmap Data", new double[][]{xValues, yValues, zValues});
JFreeChart chart = createChart(dataset, xValues, yValues, maxRequests);
displayChart(chart);
saveChartAsPNG(chart, "D:\\output\\graph_03_03_2025_dalvik.png");
spark.stop();
}
private static Object[] prepareData(Dataset<Row> graphData) {
List<Row> rows = graphData.collectAsList();
Set<Double> uniqueX = new TreeSet<>();
Set<Double> uniqueY = new TreeSet<>();
for (Row row : rows) {
uniqueX.add(((Long) row.getAs("time_window")).doubleValue());
uniqueY.add(((Long) row.getAs("duration_bucket")).doubleValue());
}
int xSize = uniqueX.size();
int ySize = uniqueY.size();
int totalSize = xSize * ySize;
double[] xvalues = new double[totalSize];
double[] yvalues = new double[totalSize];
double[] zvalues = new double[totalSize];
int index = 0;
for (double x : uniqueX) {
for (double y : uniqueY) {
xvalues[index] = x;
yvalues[index] = y;
zvalues[index] = 0;
index++;
}
}
for (Row row : rows) {
double x = ((Long) row.getAs("time_window")).doubleValue();
double y = ((Long) row.getAs("duration_bucket")).doubleValue();
double z = ((Long) row.getAs("request_count")).doubleValue();
int idx = (new ArrayList<>(uniqueX)).indexOf(x) * ySize + (new ArrayList<>(uniqueY)).indexOf(y);
zvalues[idx] = z;
}
double maxRequests = Arrays.stream(zvalues).max().orElse(1);
return new Object[]{xvalues, yvalues, zvalues, maxRequests};
}
private static JFreeChart createChart(XYZDataset dataset, double[] xValues, double[] yValues, double maxRequests) {
NumberAxis xAxis = new NumberAxis("Timestamp (30-min Windows)");
xAxis.setAutoRangeIncludesZero(false);
//xAxis.setDateFormatOverride(new SimpleDateFormat("yyyy-MM-dd HH:mm"));
NumberAxis yAxis = new NumberAxis("Duration (sec)");
yAxis.setAutoRangeIncludesZero(false);
yAxis.setTickUnit(new NumberTickUnit(10));
XYPlot plot = new XYPlot(dataset, xAxis, yAxis, null);
XYBlockRenderer renderer = new XYBlockRenderer();
double xMin = Arrays.stream(xValues).min().orElse(0);
double xMax = Arrays.stream(xValues).max().orElse(1);
double yMin = Arrays.stream(yValues).min().orElse(0);
double yMax = Arrays.stream(yValues).max().orElse(1);
long uniqueXCount = Arrays.stream(xValues).distinct().count();
long uniqueYCount = Arrays.stream(yValues).distinct().count();
double blockWidth = (xMax - xMin) / uniqueXCount;
double blockHeight = (yMax - yMin) / uniqueYCount;
renderer.setBlockWidth(blockWidth);
renderer.setBlockHeight(blockHeight);
renderer.setDefaultToolTipGenerator(new StandardXYToolTipGenerator());
renderer.setPaintScale(new SpectrumPaintScale(1, maxRequests));
plot.setRenderer(renderer);
JFreeChart chart = new JFreeChart("Heatmap: Requests by Time and Duration",
JFreeChart.DEFAULT_TITLE_FONT, plot, false);
LogarithmicAxis zAxis = new LogarithmicAxis("Request Count");
zAxis.setAutoRangeIncludesZero(false);
zAxis.setAllowNegativesFlag(false);
zAxis.setLowerBound(1);
zAxis.setUpperBound(maxRequests);
PaintScaleLegend legend = new PaintScaleLegend(new SpectrumPaintScale(1, maxRequests), zAxis);
legend.setSubdivisionCount(128);
legend.setAxisLocation(AxisLocation.TOP_OR_RIGHT);
legend.setPadding(new RectangleInsets(25, 10, 50, 10));
legend.setStripWidth(20);
legend.setPosition(RectangleEdge.RIGHT);
legend.setBackgroundPaint(Color.WHITE);
chart.addSubtitle(legend);
return chart;
}
private static void displayChart(JFreeChart chart) {
JFrame frame = new JFrame("Heatmap Visualization");
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
ChartPanel chartPanel = new ChartPanel(chart);
chartPanel.setPreferredSize(new Dimension(1400, 800));
chartPanel.setMouseZoomable(true, false);
frame.add(chartPanel);
frame.pack();
frame.setLocationRelativeTo(null);
frame.setVisible(true);
}
private static void saveChartAsPNG(JFreeChart chart, String filePath) {
try {
File file = new File(filePath);
ImageIO.write(chart.createBufferedImage(1200, 600), "png", file);
} catch (IOException e) {
e.printStackTrace();
}
}
private static class SpectrumPaintScale implements PaintScale {
private static final float H1 = 0.7f;
private static final float H2 = 0.0f;
private final double lowerBound;
private final double upperBound;
private final double logLower;
private final double logUpper;
public SpectrumPaintScale(double lowerBound, double upperBound) {
this.lowerBound = lowerBound;
this.upperBound = upperBound;
this.logLower = Math.log10(Math.max(lowerBound, 1));
this.logUpper = Math.log10(Math.max(upperBound, 1));
}
@Override
public double getLowerBound() {
return lowerBound;
}
@Override
public double getUpperBound() {
return upperBound;
}
@Override
public Paint getPaint(double value) {
if (value <= lowerBound) {
return Color.getHSBColor(H1, 1f, 1f);
}
if (value >= upperBound) {
return Color.getHSBColor(H2, 1f, 1f);
}
double logValue = Math.log10(Math.max(value, 1));
float scaledValue = (float) ((logValue - logLower) / (logUpper - logLower));
float scaledH = H1 + scaledValue * (H2 - H1);
return Color.getHSBColor(scaledH, 1f, 1f);
}
}
}
and that was the output to the logs before the fail:
25/03/21 14:14:46 INFO Executor: Finished task 0.0 in stage 2.0 (TID 79). 2007 bytes result sent to driver
25/03/21 14:14:46 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 79) in 32 ms on ILYA.mshome (executor driver) (1/1)
25/03/21 14:14:46 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
25/03/21 14:14:46 INFO DAGScheduler: ResultStage 2 (show at GraphDrawer3.java:52) finished in 0.042 s
25/03/21 14:14:46 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
25/03/21 14:14:46 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
25/03/21 14:14:46 INFO DAGScheduler: Job 2 finished: show at GraphDrawer3.java:52, took 0.045052 s
25/03/21 14:14:46 INFO CodeGenerator: Code generated in 9.4934 ms
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1741031986.377,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/7907728753176af1c64284d1c873a838a4a93b071.jpg?w=300 HTTP/1.1,200,25713,26186,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c, |
|1741031986.404,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/79077289012d0f384d3e14982adfcd7286073cfa9.jpg?w=300 HTTP/1.1,200,30450,30923,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c, |
|1741031986.418,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/7907728753176af1c64284d1c873a838a4a93b071.jpg?w=534 HTTP/1.1,200,31499,31973,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0.004,0.004,200,RU,static,HIT,n_c,|
|1741031986.663,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/792693224cbf7cacd86b0408285b116b674fb674d.jpg?w=300 HTTP/1.1,200,45336,45809,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c, |
|1741031986.787,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/79163443747900bf957fd420fa10b1b447010b4a5.jpg?w=598 HTTP/1.1,200,59851,60325,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0.004,0.004,200,RU,static,HIT,n_c,|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows
Exception in thread "main" .apache.spark.sql.AnalysisException: Column 'timestamp' does not exist. Did you mean one of the following? [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field];
'Project [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field#17, from_unixtime('timestamp, yyyy-MM-dd HH:mm:ss, Some(Europe/Moscow)) AS event_time#26]
+- Relation [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field#17] csv
at .apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7(CheckAnalysis.scala:200)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7$adapted(CheckAnalysis.scala:193)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6(CheckAnalysis.scala:193)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6$adapted(CheckAnalysis.scala:193)
at scala.collection.immutable.Stream.foreach(Stream.scala:533)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:193)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:102)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:102)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:97)
at .apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:187)
at .apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:210)
at .apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at .apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
at .apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
at .apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at .apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at .apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at .apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at .apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at .apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at .apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
at .apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at .apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at .apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
at .apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at .apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at .apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
at .apache.spark.sql.Dataset.select(Dataset.scala:1519)
at .apache.spark.sql.Dataset.withColumns(Dataset.scala:2542)
at .apache.spark.sql.Dataset.withColumn(Dataset.scala:2480)
at cdnloganalysis.GraphDrawer3.main(GraphDrawer3.java:57)
Spark doesn't find one column while njust prints it in the schema:
import .apache.spark.sql.Dataset;
import .apache.spark.sql.Row;
import .apache.spark.sql.SparkSession;
import .jfree.chart.ChartPanel;
import .jfree.chart.JFreeChart;
import .jfree.chart.axis.AxisLocation;
import .jfree.chart.axis.LogarithmicAxis;
import .jfree.chart.axis.NumberAxis;
import .jfree.chart.axis.NumberTickUnit;
import .jfree.chart.labels.StandardXYToolTipGenerator;
import .jfree.chart.plot.XYPlot;
import .jfree.chart.renderer.PaintScale;
import .jfree.chart.renderer.xy.XYBlockRenderer;
import .jfree.chart.title.PaintScaleLegend;
import .jfree.chart.ui.RectangleEdge;
import .jfree.chart.ui.RectangleInsets;
import .jfree.data.xy.DefaultXYZDataset;
import .jfree.data.xy.XYZDataset;
import javax.imageio.ImageIO;
import javax.swing.*;
import java.awt.*;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.*;
import static .apache.spark.sql.functions.*;
public class GraphDrawer3 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Graph Drawer")
.master("local[*]")
.getOrCreate();
String logFilePath = "D:\\filtered_logs_with_useragent";
Dataset<Row> logs = spark.read()
.option("delimiter", "\t")
.option("header", "true")
.csv(logFilePath);
// Checking coulmn name
logs.printSchema();
logs.show(5, false);
logs = logs//.
//withColumn("timestamp_long", col("timestamp").cast("long"))
.withColumn("event_time", from_unixtime(col("timestamp"))) //<-- fail here
.withColumn("time_window", unix_timestamp(window(col("event_time"), "1 minutes").getField("start")))
.withColumn("duration", col("duration").cast("double"))
.withColumn("out_bytes", col("out_bytes").cast("double"))
.na().fill(0, new String[]{"duration"})
.withColumn("duration_bucket", floor(col("out_bytes").divide(1000)).multiply(1000));
Dataset<Row> graphData = logs
.groupBy("time_window", "duration_bucket")
.agg(count("*").alias("request_count"))
.orderBy("time_window", "duration_bucket");
long count = graphData.count();
if (count == 0) {
System.out.println("❌ No data!");
return;
}
graphData.show(50, false);
DefaultXYZDataset dataset = new DefaultXYZDataset();
double[] xValues, yValues, zValues;
double maxRequests;
Object[] dataArrays = prepareData(graphData);
xValues = (double[]) dataArrays[0];
yValues = (double[]) dataArrays[1];
zValues = (double[]) dataArrays[2];
maxRequests = (double) dataArrays[3];
dataset.addSeries("Heatmap Data", new double[][]{xValues, yValues, zValues});
JFreeChart chart = createChart(dataset, xValues, yValues, maxRequests);
displayChart(chart);
saveChartAsPNG(chart, "D:\\output\\graph_03_03_2025_dalvik.png");
spark.stop();
}
private static Object[] prepareData(Dataset<Row> graphData) {
List<Row> rows = graphData.collectAsList();
Set<Double> uniqueX = new TreeSet<>();
Set<Double> uniqueY = new TreeSet<>();
for (Row row : rows) {
uniqueX.add(((Long) row.getAs("time_window")).doubleValue());
uniqueY.add(((Long) row.getAs("duration_bucket")).doubleValue());
}
int xSize = uniqueX.size();
int ySize = uniqueY.size();
int totalSize = xSize * ySize;
double[] xvalues = new double[totalSize];
double[] yvalues = new double[totalSize];
double[] zvalues = new double[totalSize];
int index = 0;
for (double x : uniqueX) {
for (double y : uniqueY) {
xvalues[index] = x;
yvalues[index] = y;
zvalues[index] = 0;
index++;
}
}
for (Row row : rows) {
double x = ((Long) row.getAs("time_window")).doubleValue();
double y = ((Long) row.getAs("duration_bucket")).doubleValue();
double z = ((Long) row.getAs("request_count")).doubleValue();
int idx = (new ArrayList<>(uniqueX)).indexOf(x) * ySize + (new ArrayList<>(uniqueY)).indexOf(y);
zvalues[idx] = z;
}
double maxRequests = Arrays.stream(zvalues).max().orElse(1);
return new Object[]{xvalues, yvalues, zvalues, maxRequests};
}
private static JFreeChart createChart(XYZDataset dataset, double[] xValues, double[] yValues, double maxRequests) {
NumberAxis xAxis = new NumberAxis("Timestamp (30-min Windows)");
xAxis.setAutoRangeIncludesZero(false);
//xAxis.setDateFormatOverride(new SimpleDateFormat("yyyy-MM-dd HH:mm"));
NumberAxis yAxis = new NumberAxis("Duration (sec)");
yAxis.setAutoRangeIncludesZero(false);
yAxis.setTickUnit(new NumberTickUnit(10));
XYPlot plot = new XYPlot(dataset, xAxis, yAxis, null);
XYBlockRenderer renderer = new XYBlockRenderer();
double xMin = Arrays.stream(xValues).min().orElse(0);
double xMax = Arrays.stream(xValues).max().orElse(1);
double yMin = Arrays.stream(yValues).min().orElse(0);
double yMax = Arrays.stream(yValues).max().orElse(1);
long uniqueXCount = Arrays.stream(xValues).distinct().count();
long uniqueYCount = Arrays.stream(yValues).distinct().count();
double blockWidth = (xMax - xMin) / uniqueXCount;
double blockHeight = (yMax - yMin) / uniqueYCount;
renderer.setBlockWidth(blockWidth);
renderer.setBlockHeight(blockHeight);
renderer.setDefaultToolTipGenerator(new StandardXYToolTipGenerator());
renderer.setPaintScale(new SpectrumPaintScale(1, maxRequests));
plot.setRenderer(renderer);
JFreeChart chart = new JFreeChart("Heatmap: Requests by Time and Duration",
JFreeChart.DEFAULT_TITLE_FONT, plot, false);
LogarithmicAxis zAxis = new LogarithmicAxis("Request Count");
zAxis.setAutoRangeIncludesZero(false);
zAxis.setAllowNegativesFlag(false);
zAxis.setLowerBound(1);
zAxis.setUpperBound(maxRequests);
PaintScaleLegend legend = new PaintScaleLegend(new SpectrumPaintScale(1, maxRequests), zAxis);
legend.setSubdivisionCount(128);
legend.setAxisLocation(AxisLocation.TOP_OR_RIGHT);
legend.setPadding(new RectangleInsets(25, 10, 50, 10));
legend.setStripWidth(20);
legend.setPosition(RectangleEdge.RIGHT);
legend.setBackgroundPaint(Color.WHITE);
chart.addSubtitle(legend);
return chart;
}
private static void displayChart(JFreeChart chart) {
JFrame frame = new JFrame("Heatmap Visualization");
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
ChartPanel chartPanel = new ChartPanel(chart);
chartPanel.setPreferredSize(new Dimension(1400, 800));
chartPanel.setMouseZoomable(true, false);
frame.add(chartPanel);
frame.pack();
frame.setLocationRelativeTo(null);
frame.setVisible(true);
}
private static void saveChartAsPNG(JFreeChart chart, String filePath) {
try {
File file = new File(filePath);
ImageIO.write(chart.createBufferedImage(1200, 600), "png", file);
} catch (IOException e) {
e.printStackTrace();
}
}
private static class SpectrumPaintScale implements PaintScale {
private static final float H1 = 0.7f;
private static final float H2 = 0.0f;
private final double lowerBound;
private final double upperBound;
private final double logLower;
private final double logUpper;
public SpectrumPaintScale(double lowerBound, double upperBound) {
this.lowerBound = lowerBound;
this.upperBound = upperBound;
this.logLower = Math.log10(Math.max(lowerBound, 1));
this.logUpper = Math.log10(Math.max(upperBound, 1));
}
@Override
public double getLowerBound() {
return lowerBound;
}
@Override
public double getUpperBound() {
return upperBound;
}
@Override
public Paint getPaint(double value) {
if (value <= lowerBound) {
return Color.getHSBColor(H1, 1f, 1f);
}
if (value >= upperBound) {
return Color.getHSBColor(H2, 1f, 1f);
}
double logValue = Math.log10(Math.max(value, 1));
float scaledValue = (float) ((logValue - logLower) / (logUpper - logLower));
float scaledH = H1 + scaledValue * (H2 - H1);
return Color.getHSBColor(scaledH, 1f, 1f);
}
}
}
and that was the output to the logs before the fail:
25/03/21 14:14:46 INFO Executor: Finished task 0.0 in stage 2.0 (TID 79). 2007 bytes result sent to driver
25/03/21 14:14:46 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 79) in 32 ms on ILYA.mshome (executor driver) (1/1)
25/03/21 14:14:46 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
25/03/21 14:14:46 INFO DAGScheduler: ResultStage 2 (show at GraphDrawer3.java:52) finished in 0.042 s
25/03/21 14:14:46 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
25/03/21 14:14:46 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
25/03/21 14:14:46 INFO DAGScheduler: Job 2 finished: show at GraphDrawer3.java:52, took 0.045052 s
25/03/21 14:14:46 INFO CodeGenerator: Code generated in 9.4934 ms
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1741031986.377,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/7907728753176af1c64284d1c873a838a4a93b071.jpg?w=300 HTTP/1.1,200,25713,26186,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c, |
|1741031986.404,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/79077289012d0f384d3e14982adfcd7286073cfa9.jpg?w=300 HTTP/1.1,200,30450,30923,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c, |
|1741031986.418,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/7907728753176af1c64284d1c873a838a4a93b071.jpg?w=534 HTTP/1.1,200,31499,31973,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0.004,0.004,200,RU,static,HIT,n_c,|
|1741031986.663,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/792693224cbf7cacd86b0408285b116b674fb674d.jpg?w=300 HTTP/1.1,200,45336,45809,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c, |
|1741031986.787,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/79163443747900bf957fd420fa10b1b447010b4a5.jpg?w=598 HTTP/1.1,200,59851,60325,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0.004,0.004,200,RU,static,HIT,n_c,|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows
Exception in thread "main" .apache.spark.sql.AnalysisException: Column 'timestamp' does not exist. Did you mean one of the following? [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field];
'Project [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field#17, from_unixtime('timestamp, yyyy-MM-dd HH:mm:ss, Some(Europe/Moscow)) AS event_time#26]
+- Relation [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field#17] csv
at .apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7(CheckAnalysis.scala:200)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7$adapted(CheckAnalysis.scala:193)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6(CheckAnalysis.scala:193)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6$adapted(CheckAnalysis.scala:193)
at scala.collection.immutable.Stream.foreach(Stream.scala:533)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:193)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:102)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:102)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:97)
at .apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:187)
at .apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:210)
at .apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at .apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
at .apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
at .apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at .apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at .apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at .apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at .apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at .apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at .apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
at .apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at .apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at .apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
at .apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at .apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at .apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
at .apache.spark.sql.Dataset.select(Dataset.scala:1519)
at .apache.spark.sql.Dataset.withColumns(Dataset.scala:2542)
at .apache.spark.sql.Dataset.withColumn(Dataset.scala:2480)
at cdnloganalysis.GraphDrawer3.main(GraphDrawer3.java:57)
Share
Improve this question
asked Mar 21 at 18:00
EljahEljah
5,2557 gold badges60 silver badges109 bronze badges
1
- All the awt, swing and jfree code is not needed to reproduce the issue. I recommend to replace for an minimal reproducible example. – aled Commented Mar 21 at 19:39
1 Answer
Reset to default 1The wring place here was the forcing the delimeter.
.option("delimiter", "\t")
led to timestamp column of wrong type
Spark doesn't find one column while njust prints it in the schema:
import .apache.spark.sql.Dataset;
import .apache.spark.sql.Row;
import .apache.spark.sql.SparkSession;
import .jfree.chart.ChartPanel;
import .jfree.chart.JFreeChart;
import .jfree.chart.axis.AxisLocation;
import .jfree.chart.axis.LogarithmicAxis;
import .jfree.chart.axis.NumberAxis;
import .jfree.chart.axis.NumberTickUnit;
import .jfree.chart.labels.StandardXYToolTipGenerator;
import .jfree.chart.plot.XYPlot;
import .jfree.chart.renderer.PaintScale;
import .jfree.chart.renderer.xy.XYBlockRenderer;
import .jfree.chart.title.PaintScaleLegend;
import .jfree.chart.ui.RectangleEdge;
import .jfree.chart.ui.RectangleInsets;
import .jfree.data.xy.DefaultXYZDataset;
import .jfree.data.xy.XYZDataset;
import javax.imageio.ImageIO;
import javax.swing.*;
import java.awt.*;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.*;
import static .apache.spark.sql.functions.*;
public class GraphDrawer3 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Graph Drawer")
.master("local[*]")
.getOrCreate();
String logFilePath = "D:\\filtered_logs_with_useragent";
Dataset<Row> logs = spark.read()
.option("delimiter", "\t")
.option("header", "true")
.csv(logFilePath);
// Checking coulmn name
logs.printSchema();
logs.show(5, false);
logs = logs//.
//withColumn("timestamp_long", col("timestamp").cast("long"))
.withColumn("event_time", from_unixtime(col("timestamp"))) //<-- fail here
.withColumn("time_window", unix_timestamp(window(col("event_time"), "1 minutes").getField("start")))
.withColumn("duration", col("duration").cast("double"))
.withColumn("out_bytes", col("out_bytes").cast("double"))
.na().fill(0, new String[]{"duration"})
.withColumn("duration_bucket", floor(col("out_bytes").divide(1000)).multiply(1000));
Dataset<Row> graphData = logs
.groupBy("time_window", "duration_bucket")
.agg(count("*").alias("request_count"))
.orderBy("time_window", "duration_bucket");
long count = graphData.count();
if (count == 0) {
System.out.println("❌ No data!");
return;
}
graphData.show(50, false);
DefaultXYZDataset dataset = new DefaultXYZDataset();
double[] xValues, yValues, zValues;
double maxRequests;
Object[] dataArrays = prepareData(graphData);
xValues = (double[]) dataArrays[0];
yValues = (double[]) dataArrays[1];
zValues = (double[]) dataArrays[2];
maxRequests = (double) dataArrays[3];
dataset.addSeries("Heatmap Data", new double[][]{xValues, yValues, zValues});
JFreeChart chart = createChart(dataset, xValues, yValues, maxRequests);
displayChart(chart);
saveChartAsPNG(chart, "D:\\output\\graph_03_03_2025_dalvik.png");
spark.stop();
}
private static Object[] prepareData(Dataset<Row> graphData) {
List<Row> rows = graphData.collectAsList();
Set<Double> uniqueX = new TreeSet<>();
Set<Double> uniqueY = new TreeSet<>();
for (Row row : rows) {
uniqueX.add(((Long) row.getAs("time_window")).doubleValue());
uniqueY.add(((Long) row.getAs("duration_bucket")).doubleValue());
}
int xSize = uniqueX.size();
int ySize = uniqueY.size();
int totalSize = xSize * ySize;
double[] xvalues = new double[totalSize];
double[] yvalues = new double[totalSize];
double[] zvalues = new double[totalSize];
int index = 0;
for (double x : uniqueX) {
for (double y : uniqueY) {
xvalues[index] = x;
yvalues[index] = y;
zvalues[index] = 0;
index++;
}
}
for (Row row : rows) {
double x = ((Long) row.getAs("time_window")).doubleValue();
double y = ((Long) row.getAs("duration_bucket")).doubleValue();
double z = ((Long) row.getAs("request_count")).doubleValue();
int idx = (new ArrayList<>(uniqueX)).indexOf(x) * ySize + (new ArrayList<>(uniqueY)).indexOf(y);
zvalues[idx] = z;
}
double maxRequests = Arrays.stream(zvalues).max().orElse(1);
return new Object[]{xvalues, yvalues, zvalues, maxRequests};
}
private static JFreeChart createChart(XYZDataset dataset, double[] xValues, double[] yValues, double maxRequests) {
NumberAxis xAxis = new NumberAxis("Timestamp (30-min Windows)");
xAxis.setAutoRangeIncludesZero(false);
//xAxis.setDateFormatOverride(new SimpleDateFormat("yyyy-MM-dd HH:mm"));
NumberAxis yAxis = new NumberAxis("Duration (sec)");
yAxis.setAutoRangeIncludesZero(false);
yAxis.setTickUnit(new NumberTickUnit(10));
XYPlot plot = new XYPlot(dataset, xAxis, yAxis, null);
XYBlockRenderer renderer = new XYBlockRenderer();
double xMin = Arrays.stream(xValues).min().orElse(0);
double xMax = Arrays.stream(xValues).max().orElse(1);
double yMin = Arrays.stream(yValues).min().orElse(0);
double yMax = Arrays.stream(yValues).max().orElse(1);
long uniqueXCount = Arrays.stream(xValues).distinct().count();
long uniqueYCount = Arrays.stream(yValues).distinct().count();
double blockWidth = (xMax - xMin) / uniqueXCount;
double blockHeight = (yMax - yMin) / uniqueYCount;
renderer.setBlockWidth(blockWidth);
renderer.setBlockHeight(blockHeight);
renderer.setDefaultToolTipGenerator(new StandardXYToolTipGenerator());
renderer.setPaintScale(new SpectrumPaintScale(1, maxRequests));
plot.setRenderer(renderer);
JFreeChart chart = new JFreeChart("Heatmap: Requests by Time and Duration",
JFreeChart.DEFAULT_TITLE_FONT, plot, false);
LogarithmicAxis zAxis = new LogarithmicAxis("Request Count");
zAxis.setAutoRangeIncludesZero(false);
zAxis.setAllowNegativesFlag(false);
zAxis.setLowerBound(1);
zAxis.setUpperBound(maxRequests);
PaintScaleLegend legend = new PaintScaleLegend(new SpectrumPaintScale(1, maxRequests), zAxis);
legend.setSubdivisionCount(128);
legend.setAxisLocation(AxisLocation.TOP_OR_RIGHT);
legend.setPadding(new RectangleInsets(25, 10, 50, 10));
legend.setStripWidth(20);
legend.setPosition(RectangleEdge.RIGHT);
legend.setBackgroundPaint(Color.WHITE);
chart.addSubtitle(legend);
return chart;
}
private static void displayChart(JFreeChart chart) {
JFrame frame = new JFrame("Heatmap Visualization");
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
ChartPanel chartPanel = new ChartPanel(chart);
chartPanel.setPreferredSize(new Dimension(1400, 800));
chartPanel.setMouseZoomable(true, false);
frame.add(chartPanel);
frame.pack();
frame.setLocationRelativeTo(null);
frame.setVisible(true);
}
private static void saveChartAsPNG(JFreeChart chart, String filePath) {
try {
File file = new File(filePath);
ImageIO.write(chart.createBufferedImage(1200, 600), "png", file);
} catch (IOException e) {
e.printStackTrace();
}
}
private static class SpectrumPaintScale implements PaintScale {
private static final float H1 = 0.7f;
private static final float H2 = 0.0f;
private final double lowerBound;
private final double upperBound;
private final double logLower;
private final double logUpper;
public SpectrumPaintScale(double lowerBound, double upperBound) {
this.lowerBound = lowerBound;
this.upperBound = upperBound;
this.logLower = Math.log10(Math.max(lowerBound, 1));
this.logUpper = Math.log10(Math.max(upperBound, 1));
}
@Override
public double getLowerBound() {
return lowerBound;
}
@Override
public double getUpperBound() {
return upperBound;
}
@Override
public Paint getPaint(double value) {
if (value <= lowerBound) {
return Color.getHSBColor(H1, 1f, 1f);
}
if (value >= upperBound) {
return Color.getHSBColor(H2, 1f, 1f);
}
double logValue = Math.log10(Math.max(value, 1));
float scaledValue = (float) ((logValue - logLower) / (logUpper - logLower));
float scaledH = H1 + scaledValue * (H2 - H1);
return Color.getHSBColor(scaledH, 1f, 1f);
}
}
}
and that was the output to the logs before the fail:
25/03/21 14:14:46 INFO Executor: Finished task 0.0 in stage 2.0 (TID 79). 2007 bytes result sent to driver
25/03/21 14:14:46 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 79) in 32 ms on ILYA.mshome (executor driver) (1/1)
25/03/21 14:14:46 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
25/03/21 14:14:46 INFO DAGScheduler: ResultStage 2 (show at GraphDrawer3.java:52) finished in 0.042 s
25/03/21 14:14:46 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
25/03/21 14:14:46 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
25/03/21 14:14:46 INFO DAGScheduler: Job 2 finished: show at GraphDrawer3.java:52, took 0.045052 s
25/03/21 14:14:46 INFO CodeGenerator: Code generated in 9.4934 ms
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1741031986.377,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/7907728753176af1c64284d1c873a838a4a93b071.jpg?w=300 HTTP/1.1,200,25713,26186,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c, |
|1741031986.404,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/79077289012d0f384d3e14982adfcd7286073cfa9.jpg?w=300 HTTP/1.1,200,30450,30923,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c, |
|1741031986.418,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/7907728753176af1c64284d1c873a838a4a93b071.jpg?w=534 HTTP/1.1,200,31499,31973,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0.004,0.004,200,RU,static,HIT,n_c,|
|1741031986.663,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/792693224cbf7cacd86b0408285b116b674fb674d.jpg?w=300 HTTP/1.1,200,45336,45809,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c, |
|1741031986.787,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/79163443747900bf957fd420fa10b1b447010b4a5.jpg?w=598 HTTP/1.1,200,59851,60325,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0.004,0.004,200,RU,static,HIT,n_c,|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows
Exception in thread "main" .apache.spark.sql.AnalysisException: Column 'timestamp' does not exist. Did you mean one of the following? [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field];
'Project [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field#17, from_unixtime('timestamp, yyyy-MM-dd HH:mm:ss, Some(Europe/Moscow)) AS event_time#26]
+- Relation [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field#17] csv
at .apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7(CheckAnalysis.scala:200)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7$adapted(CheckAnalysis.scala:193)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6(CheckAnalysis.scala:193)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6$adapted(CheckAnalysis.scala:193)
at scala.collection.immutable.Stream.foreach(Stream.scala:533)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:193)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:102)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:102)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:97)
at .apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:187)
at .apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:210)
at .apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at .apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
at .apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
at .apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at .apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at .apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at .apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at .apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at .apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at .apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
at .apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at .apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at .apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
at .apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at .apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at .apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
at .apache.spark.sql.Dataset.select(Dataset.scala:1519)
at .apache.spark.sql.Dataset.withColumns(Dataset.scala:2542)
at .apache.spark.sql.Dataset.withColumn(Dataset.scala:2480)
at cdnloganalysis.GraphDrawer3.main(GraphDrawer3.java:57)
Spark doesn't find one column while njust prints it in the schema:
import .apache.spark.sql.Dataset;
import .apache.spark.sql.Row;
import .apache.spark.sql.SparkSession;
import .jfree.chart.ChartPanel;
import .jfree.chart.JFreeChart;
import .jfree.chart.axis.AxisLocation;
import .jfree.chart.axis.LogarithmicAxis;
import .jfree.chart.axis.NumberAxis;
import .jfree.chart.axis.NumberTickUnit;
import .jfree.chart.labels.StandardXYToolTipGenerator;
import .jfree.chart.plot.XYPlot;
import .jfree.chart.renderer.PaintScale;
import .jfree.chart.renderer.xy.XYBlockRenderer;
import .jfree.chart.title.PaintScaleLegend;
import .jfree.chart.ui.RectangleEdge;
import .jfree.chart.ui.RectangleInsets;
import .jfree.data.xy.DefaultXYZDataset;
import .jfree.data.xy.XYZDataset;
import javax.imageio.ImageIO;
import javax.swing.*;
import java.awt.*;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.*;
import static .apache.spark.sql.functions.*;
public class GraphDrawer3 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Graph Drawer")
.master("local[*]")
.getOrCreate();
String logFilePath = "D:\\filtered_logs_with_useragent";
Dataset<Row> logs = spark.read()
.option("delimiter", "\t")
.option("header", "true")
.csv(logFilePath);
// Checking coulmn name
logs.printSchema();
logs.show(5, false);
logs = logs//.
//withColumn("timestamp_long", col("timestamp").cast("long"))
.withColumn("event_time", from_unixtime(col("timestamp"))) //<-- fail here
.withColumn("time_window", unix_timestamp(window(col("event_time"), "1 minutes").getField("start")))
.withColumn("duration", col("duration").cast("double"))
.withColumn("out_bytes", col("out_bytes").cast("double"))
.na().fill(0, new String[]{"duration"})
.withColumn("duration_bucket", floor(col("out_bytes").divide(1000)).multiply(1000));
Dataset<Row> graphData = logs
.groupBy("time_window", "duration_bucket")
.agg(count("*").alias("request_count"))
.orderBy("time_window", "duration_bucket");
long count = graphData.count();
if (count == 0) {
System.out.println("❌ No data!");
return;
}
graphData.show(50, false);
DefaultXYZDataset dataset = new DefaultXYZDataset();
double[] xValues, yValues, zValues;
double maxRequests;
Object[] dataArrays = prepareData(graphData);
xValues = (double[]) dataArrays[0];
yValues = (double[]) dataArrays[1];
zValues = (double[]) dataArrays[2];
maxRequests = (double) dataArrays[3];
dataset.addSeries("Heatmap Data", new double[][]{xValues, yValues, zValues});
JFreeChart chart = createChart(dataset, xValues, yValues, maxRequests);
displayChart(chart);
saveChartAsPNG(chart, "D:\\output\\graph_03_03_2025_dalvik.png");
spark.stop();
}
private static Object[] prepareData(Dataset<Row> graphData) {
List<Row> rows = graphData.collectAsList();
Set<Double> uniqueX = new TreeSet<>();
Set<Double> uniqueY = new TreeSet<>();
for (Row row : rows) {
uniqueX.add(((Long) row.getAs("time_window")).doubleValue());
uniqueY.add(((Long) row.getAs("duration_bucket")).doubleValue());
}
int xSize = uniqueX.size();
int ySize = uniqueY.size();
int totalSize = xSize * ySize;
double[] xvalues = new double[totalSize];
double[] yvalues = new double[totalSize];
double[] zvalues = new double[totalSize];
int index = 0;
for (double x : uniqueX) {
for (double y : uniqueY) {
xvalues[index] = x;
yvalues[index] = y;
zvalues[index] = 0;
index++;
}
}
for (Row row : rows) {
double x = ((Long) row.getAs("time_window")).doubleValue();
double y = ((Long) row.getAs("duration_bucket")).doubleValue();
double z = ((Long) row.getAs("request_count")).doubleValue();
int idx = (new ArrayList<>(uniqueX)).indexOf(x) * ySize + (new ArrayList<>(uniqueY)).indexOf(y);
zvalues[idx] = z;
}
double maxRequests = Arrays.stream(zvalues).max().orElse(1);
return new Object[]{xvalues, yvalues, zvalues, maxRequests};
}
private static JFreeChart createChart(XYZDataset dataset, double[] xValues, double[] yValues, double maxRequests) {
NumberAxis xAxis = new NumberAxis("Timestamp (30-min Windows)");
xAxis.setAutoRangeIncludesZero(false);
//xAxis.setDateFormatOverride(new SimpleDateFormat("yyyy-MM-dd HH:mm"));
NumberAxis yAxis = new NumberAxis("Duration (sec)");
yAxis.setAutoRangeIncludesZero(false);
yAxis.setTickUnit(new NumberTickUnit(10));
XYPlot plot = new XYPlot(dataset, xAxis, yAxis, null);
XYBlockRenderer renderer = new XYBlockRenderer();
double xMin = Arrays.stream(xValues).min().orElse(0);
double xMax = Arrays.stream(xValues).max().orElse(1);
double yMin = Arrays.stream(yValues).min().orElse(0);
double yMax = Arrays.stream(yValues).max().orElse(1);
long uniqueXCount = Arrays.stream(xValues).distinct().count();
long uniqueYCount = Arrays.stream(yValues).distinct().count();
double blockWidth = (xMax - xMin) / uniqueXCount;
double blockHeight = (yMax - yMin) / uniqueYCount;
renderer.setBlockWidth(blockWidth);
renderer.setBlockHeight(blockHeight);
renderer.setDefaultToolTipGenerator(new StandardXYToolTipGenerator());
renderer.setPaintScale(new SpectrumPaintScale(1, maxRequests));
plot.setRenderer(renderer);
JFreeChart chart = new JFreeChart("Heatmap: Requests by Time and Duration",
JFreeChart.DEFAULT_TITLE_FONT, plot, false);
LogarithmicAxis zAxis = new LogarithmicAxis("Request Count");
zAxis.setAutoRangeIncludesZero(false);
zAxis.setAllowNegativesFlag(false);
zAxis.setLowerBound(1);
zAxis.setUpperBound(maxRequests);
PaintScaleLegend legend = new PaintScaleLegend(new SpectrumPaintScale(1, maxRequests), zAxis);
legend.setSubdivisionCount(128);
legend.setAxisLocation(AxisLocation.TOP_OR_RIGHT);
legend.setPadding(new RectangleInsets(25, 10, 50, 10));
legend.setStripWidth(20);
legend.setPosition(RectangleEdge.RIGHT);
legend.setBackgroundPaint(Color.WHITE);
chart.addSubtitle(legend);
return chart;
}
private static void displayChart(JFreeChart chart) {
JFrame frame = new JFrame("Heatmap Visualization");
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
ChartPanel chartPanel = new ChartPanel(chart);
chartPanel.setPreferredSize(new Dimension(1400, 800));
chartPanel.setMouseZoomable(true, false);
frame.add(chartPanel);
frame.pack();
frame.setLocationRelativeTo(null);
frame.setVisible(true);
}
private static void saveChartAsPNG(JFreeChart chart, String filePath) {
try {
File file = new File(filePath);
ImageIO.write(chart.createBufferedImage(1200, 600), "png", file);
} catch (IOException e) {
e.printStackTrace();
}
}
private static class SpectrumPaintScale implements PaintScale {
private static final float H1 = 0.7f;
private static final float H2 = 0.0f;
private final double lowerBound;
private final double upperBound;
private final double logLower;
private final double logUpper;
public SpectrumPaintScale(double lowerBound, double upperBound) {
this.lowerBound = lowerBound;
this.upperBound = upperBound;
this.logLower = Math.log10(Math.max(lowerBound, 1));
this.logUpper = Math.log10(Math.max(upperBound, 1));
}
@Override
public double getLowerBound() {
return lowerBound;
}
@Override
public double getUpperBound() {
return upperBound;
}
@Override
public Paint getPaint(double value) {
if (value <= lowerBound) {
return Color.getHSBColor(H1, 1f, 1f);
}
if (value >= upperBound) {
return Color.getHSBColor(H2, 1f, 1f);
}
double logValue = Math.log10(Math.max(value, 1));
float scaledValue = (float) ((logValue - logLower) / (logUpper - logLower));
float scaledH = H1 + scaledValue * (H2 - H1);
return Color.getHSBColor(scaledH, 1f, 1f);
}
}
}
and that was the output to the logs before the fail:
25/03/21 14:14:46 INFO Executor: Finished task 0.0 in stage 2.0 (TID 79). 2007 bytes result sent to driver
25/03/21 14:14:46 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 79) in 32 ms on ILYA.mshome (executor driver) (1/1)
25/03/21 14:14:46 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
25/03/21 14:14:46 INFO DAGScheduler: ResultStage 2 (show at GraphDrawer3.java:52) finished in 0.042 s
25/03/21 14:14:46 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
25/03/21 14:14:46 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
25/03/21 14:14:46 INFO DAGScheduler: Job 2 finished: show at GraphDrawer3.java:52, took 0.045052 s
25/03/21 14:14:46 INFO CodeGenerator: Code generated in 9.4934 ms
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1741031986.377,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/7907728753176af1c64284d1c873a838a4a93b071.jpg?w=300 HTTP/1.1,200,25713,26186,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c, |
|1741031986.404,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/79077289012d0f384d3e14982adfcd7286073cfa9.jpg?w=300 HTTP/1.1,200,30450,30923,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c, |
|1741031986.418,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/7907728753176af1c64284d1c873a838a4a93b071.jpg?w=534 HTTP/1.1,200,31499,31973,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0.004,0.004,200,RU,static,HIT,n_c,|
|1741031986.663,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/792693224cbf7cacd86b0408285b116b674fb674d.jpg?w=300 HTTP/1.1,200,45336,45809,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c, |
|1741031986.787,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/79163443747900bf957fd420fa10b1b447010b4a5.jpg?w=598 HTTP/1.1,200,59851,60325,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0.004,0.004,200,RU,static,HIT,n_c,|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows
Exception in thread "main" .apache.spark.sql.AnalysisException: Column 'timestamp' does not exist. Did you mean one of the following? [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field];
'Project [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field#17, from_unixtime('timestamp, yyyy-MM-dd HH:mm:ss, Some(Europe/Moscow)) AS event_time#26]
+- Relation [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field#17] csv
at .apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7(CheckAnalysis.scala:200)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7$adapted(CheckAnalysis.scala:193)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6(CheckAnalysis.scala:193)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6$adapted(CheckAnalysis.scala:193)
at scala.collection.immutable.Stream.foreach(Stream.scala:533)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:193)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:102)
at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:102)
at .apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:97)
at .apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:187)
at .apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:210)
at .apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at .apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
at .apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
at .apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at .apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at .apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at .apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at .apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at .apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at .apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
at .apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at .apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at .apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
at .apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at .apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at .apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
at .apache.spark.sql.Dataset.select(Dataset.scala:1519)
at .apache.spark.sql.Dataset.withColumns(Dataset.scala:2542)
at .apache.spark.sql.Dataset.withColumn(Dataset.scala:2480)
at cdnloganalysis.GraphDrawer3.main(GraphDrawer3.java:57)
Share
Improve this question
asked Mar 21 at 18:00
EljahEljah
5,2557 gold badges60 silver badges109 bronze badges
1
- All the awt, swing and jfree code is not needed to reproduce the issue. I recommend to replace for an minimal reproducible example. – aled Commented Mar 21 at 19:39
1 Answer
Reset to default 1The wring place here was the forcing the delimeter.
.option("delimiter", "\t")
led to timestamp column of wrong type
本文标签:
版权声明:本文标题:java - Exception in thread "main" org.apache.spark.sql.AnalysisException: Column 'timestamp' d 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/questions/1744340652a2092284.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论