Traffic Flow Analysis with Spark: End of Queue Detection

Contact: Ahmad Al-Shishtawy (email)

Contents:

  • Introduction
  • Traffic Flow Data
  • Data Exploration
  • Fundamental Diagrams of Traffic Flow
  • End of Queue Detection
  • Visualization
  • Next Steps

Introduction

Hadoop Ecosystem

An ecosystem for storing and processing large amounts of data.

Basic components:

  • Distributed storage
  • Computation engine (MapReduce)
  • Libraries
  • Applications

Designed to run reliably on comodity servers

Hadoop Ecosystem

Apache Spark

  • General purpose in-memory Data Processing engine
  • Integrates very well with the Hadoop Ecosystem (and much more)
  • Unified engine supporting SQL queries, streaming data, machine learning, and graph processing
  • Open source Apache project

Spark

Credits

The Hadoop and Spark diagrams above are from the Spark for Dummies Free Book

Jupyter Notebook

Well, this is a Jupyter notebook :)

It is an interactive document that contain live code, equations, visualizations and explanatory text.

Just like this example below. Try changing the code and then execute the cell by pressing (Shift+Enter).

In [1]:
print('Hello World!!')

x = 2
y = 100

print('{} + {} = {}'.format(x, y, x+y))

print('{}^{} = {}'.format(x, y, x**y))
Hello World!!
2 + 100 = 102
2^100 = 1267650600228229401496703205376

Jupyter has become the de facto standard used by many data scientist to interactivly analyze data.

Jupyter

Traffic Flow Data

In this demo we will be using traffic flow data generated from radar detectors (sensors) that are part of the Motorway Control System (MCS) distributed along the highways in Stockholm (see the picture below for an example).

The detectors generate data every minute. The data contains the average speed and the number of cars that passed in that minute.

The sensors are identified by two fields:

  • Ds_Reference: such as "E4S 61,036", which is composed of the road name "E4" and the direction "S" for south, and the kilometer reference.
  • Detector_Number: Detectors are numbered from right to left starting from 1 for the first detector in the rightmost lane.

Flow Sensors

Data Exploration

We will use Spark to explore and analyze the traffic flow data. But first we need to load the data and do some cleanup.

Starting Spark

We import some libraries that we will be using then initialize spark

In [2]:
### Uncomment to install missing packages as needed (you may need to restart the kernel)

### 1 - ipyleaflet
### Conda package is old (ver 0.3) so using pip instead (ver 0.4)
# !pip install -U ipyleaflet 
### or
# !conda install -y -c conda-forge ipyleaflet

### 2 - geojson
# !conda install -y geojson
In [3]:
from ipyleaflet import Map, GeoJSON, TileLayer
from geojson import FeatureCollection, Feature, MultiPolygon
import json
from  datetime import timedelta, datetime
import time
import ipywidgets as widgets

import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline 
In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import udf, col, lag, datediff, unix_timestamp
from pyspark.sql.window import Window
In [5]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Demo4') \
    .config('spark.executor.memory', '5g') \
    .config('spark.driver.memory', '5g') \
    .getOrCreate()
In [6]:
spark.version
Out[6]:
'2.2.0'

Loading The Data

The data is stored in a csv file (comma seperated values). Spark can parse csv files and can infer the schema but it is better to define the schema specially when dealing with very large datasets.

Define the schema of the csv file

In [7]:
schema_flow = StructType().add('Timestamp', TimestampType(), False) \
        .add('Ds_Reference', StringType(), False) \
        .add('Detector_Number', ShortType(), False) \
        .add('Traffic_Direction', ShortType(), False) \
        .add('Flow_In', ShortType(), False) \
        .add('Average_Speed', ShortType(), False) \
        .add('Sign_Aid_Det_Comms', ShortType(), False) \
        .add('Status', ShortType(), False) \
        .add('Legend_Group', ShortType(), False) \
        .add('Legend_Sign', ShortType(), False) \
        .add('Legend_SubSign', ShortType(), False) \
        .add('Protocol_Version', StringType(), False)        

Read the data

Next we read the data and load it into Spark. The data is represented in Spark as a DataFrame wich is similar to a table in a database.

In [8]:
df_raw = spark.read.csv('data/sample_E4N.csv', sep=';', schema=schema_flow, ignoreLeadingWhiteSpace=True, \
                    ignoreTrailingWhiteSpace=True, timestampFormat='yyyy-MM-dd HH:mm:ss.SSS')

We can inspect the schema to check it.

In [9]:
df_raw.printSchema()
root
 |-- Timestamp: timestamp (nullable = true)
 |-- Ds_Reference: string (nullable = true)
 |-- Detector_Number: short (nullable = true)
 |-- Traffic_Direction: short (nullable = true)
 |-- Flow_In: short (nullable = true)
 |-- Average_Speed: short (nullable = true)
 |-- Sign_Aid_Det_Comms: short (nullable = true)
 |-- Status: short (nullable = true)
 |-- Legend_Group: short (nullable = true)
 |-- Legend_Sign: short (nullable = true)
 |-- Legend_SubSign: short (nullable = true)
 |-- Protocol_Version: string (nullable = true)

Count the records

We count the number of records in the DataFrame. We can add %%time in the beginning of any cell in Jupyter to measure the execution time of the cell.

In [10]:
%%time
df_raw.count()
CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 2.09 s
Out[10]:
675360
In [11]:
df_raw.show(10)
+-------------------+------------+---------------+-----------------+-------+-------------+------------------+------+------------+-----------+--------------+----------------+
|          Timestamp|Ds_Reference|Detector_Number|Traffic_Direction|Flow_In|Average_Speed|Sign_Aid_Det_Comms|Status|Legend_Group|Legend_Sign|Legend_SubSign|Protocol_Version|
+-------------------+------------+---------------+-----------------+-------+-------------+------------------+------+------------+-----------+--------------+----------------+
|2016-11-01 00:00:00|  E4N 47,465|             49|               78|      0|          252|                 0|     1|           2|          2|             2|               4|
|2016-11-01 00:00:00|  E4N 47,465|             50|               78|      0|          252|                 0|     1|           1|         70|             1|               4|
|2016-11-01 00:00:00|  E4N 47,465|             51|               78|      0|          252|                 0|     1|           1|         70|             1|               4|
|2016-11-01 00:00:00|  E4N 47,800|             49|               78|    253|          253|                32|     1|           2|          4|             1|               4|
|2016-11-01 00:00:00|  E4N 47,800|             50|               78|    254|          254|                 0|     2|           2|          2|             2|               4|
|2016-11-01 00:00:00|  E4N 47,800|             51|               78|    254|          254|                 0|     2|           1|         70|             1|               4|
|2016-11-01 00:00:00|  E4N 48,290|             49|               78|    253|          253|                32|     1|           2|          4|             1|               4|
|2016-11-01 00:00:00|  E4N 48,290|             50|               78|    253|          253|                32|     1|           2|          4|             1|               4|
|2016-11-01 00:00:00|  E4N 48,290|             51|               78|      8|           82|                 0|     3|           2|          2|             2|               4|
|2016-11-01 00:00:00|  E4N 48,620|             49|               78|    253|          253|                32|     1|           2|          4|             1|               4|
+-------------------+------------+---------------+-----------------+-------+-------------+------------------+------+------------+-----------+--------------+----------------+
only showing top 10 rows

Data Wrangling

Before we start working with the data we usually need to do some preprocessing and cleanup.

In our example:

  1. Split the Ds_Reference into road name and KM offset
  2. Convert Km reference from Swdish number format (using ',' instead of '.') to meters
  3. Fix Detector_Number field
  4. Filter out records with errors (e.g., detector not working)

Split Ds_Reference and fix Km

Spark can take a user defined function udf() and execute it on all elements in a column. udf() takes as parameters a user defined function and the new schema that will be produced.

In [12]:
split_schema = StructType([
  StructField('Road', StringType(), False),
  StructField('Km_Ref', IntegerType(), False)
])

@udf(split_schema)
def split_ds_ref(s):
    try:
        r, km = s.split(' ')
        k, m = km.split(',')
        meter = int(k)*1000 + int(m)
        return r, meter
    except:
        return None

We use .withColume to apply our function to the elements of the column. Using the same column name means the contents will be replaced, otherwise a new column will be created.

In [13]:
df_cleanup1 = df_raw.withColumn('Ds_Reference', split_ds_ref('Ds_Reference'))
df_cleanup1.printSchema()
root
 |-- Timestamp: timestamp (nullable = true)
 |-- Ds_Reference: struct (nullable = true)
 |    |-- Road: string (nullable = false)
 |    |-- Km_Ref: integer (nullable = false)
 |-- Detector_Number: short (nullable = true)
 |-- Traffic_Direction: short (nullable = true)
 |-- Flow_In: short (nullable = true)
 |-- Average_Speed: short (nullable = true)
 |-- Sign_Aid_Det_Comms: short (nullable = true)
 |-- Status: short (nullable = true)
 |-- Legend_Group: short (nullable = true)
 |-- Legend_Sign: short (nullable = true)
 |-- Legend_SubSign: short (nullable = true)
 |-- Protocol_Version: string (nullable = true)

In [14]:
df_cleanup1.show(10)
+-------------------+------------+---------------+-----------------+-------+-------------+------------------+------+------------+-----------+--------------+----------------+
|          Timestamp|Ds_Reference|Detector_Number|Traffic_Direction|Flow_In|Average_Speed|Sign_Aid_Det_Comms|Status|Legend_Group|Legend_Sign|Legend_SubSign|Protocol_Version|
+-------------------+------------+---------------+-----------------+-------+-------------+------------------+------+------------+-----------+--------------+----------------+
|2016-11-01 00:00:00| [E4N,47465]|             49|               78|      0|          252|                 0|     1|           2|          2|             2|               4|
|2016-11-01 00:00:00| [E4N,47465]|             50|               78|      0|          252|                 0|     1|           1|         70|             1|               4|
|2016-11-01 00:00:00| [E4N,47465]|             51|               78|      0|          252|                 0|     1|           1|         70|             1|               4|
|2016-11-01 00:00:00| [E4N,47800]|             49|               78|    253|          253|                32|     1|           2|          4|             1|               4|
|2016-11-01 00:00:00| [E4N,47800]|             50|               78|    254|          254|                 0|     2|           2|          2|             2|               4|
|2016-11-01 00:00:00| [E4N,47800]|             51|               78|    254|          254|                 0|     2|           1|         70|             1|               4|
|2016-11-01 00:00:00| [E4N,48290]|             49|               78|    253|          253|                32|     1|           2|          4|             1|               4|
|2016-11-01 00:00:00| [E4N,48290]|             50|               78|    253|          253|                32|     1|           2|          4|             1|               4|
|2016-11-01 00:00:00| [E4N,48290]|             51|               78|      8|           82|                 0|     3|           2|          2|             2|               4|
|2016-11-01 00:00:00| [E4N,48620]|             49|               78|    253|          253|                32|     1|           2|          4|             1|               4|
+-------------------+------------+---------------+-----------------+-------+-------------+------------------+------+------------+-----------+--------------+----------------+
only showing top 10 rows

Fix detector number

Detector number is stored as a character. For example, Detector 1 is stored as 49 which is the ascii code for character '1'. We fix that as well.

In [15]:
ascii_to_int = udf(lambda x : x - 48, ShortType())
df_cleanup2 = df_cleanup1.withColumn('Detector_Number', ascii_to_int('Detector_Number'))
In [16]:
df_cleanup2.show(10)
+-------------------+------------+---------------+-----------------+-------+-------------+------------------+------+------------+-----------+--------------+----------------+
|          Timestamp|Ds_Reference|Detector_Number|Traffic_Direction|Flow_In|Average_Speed|Sign_Aid_Det_Comms|Status|Legend_Group|Legend_Sign|Legend_SubSign|Protocol_Version|
+-------------------+------------+---------------+-----------------+-------+-------------+------------------+------+------------+-----------+--------------+----------------+
|2016-11-01 00:00:00| [E4N,47465]|              1|               78|      0|          252|                 0|     1|           2|          2|             2|               4|
|2016-11-01 00:00:00| [E4N,47465]|              2|               78|      0|          252|                 0|     1|           1|         70|             1|               4|
|2016-11-01 00:00:00| [E4N,47465]|              3|               78|      0|          252|                 0|     1|           1|         70|             1|               4|
|2016-11-01 00:00:00| [E4N,47800]|              1|               78|    253|          253|                32|     1|           2|          4|             1|               4|
|2016-11-01 00:00:00| [E4N,47800]|              2|               78|    254|          254|                 0|     2|           2|          2|             2|               4|
|2016-11-01 00:00:00| [E4N,47800]|              3|               78|    254|          254|                 0|     2|           1|         70|             1|               4|
|2016-11-01 00:00:00| [E4N,48290]|              1|               78|    253|          253|                32|     1|           2|          4|             1|               4|
|2016-11-01 00:00:00| [E4N,48290]|              2|               78|    253|          253|                32|     1|           2|          4|             1|               4|
|2016-11-01 00:00:00| [E4N,48290]|              3|               78|      8|           82|                 0|     3|           2|          2|             2|               4|
|2016-11-01 00:00:00| [E4N,48620]|              1|               78|    253|          253|                32|     1|           2|          4|             1|               4|
+-------------------+------------+---------------+-----------------+-------+-------------+------------------+------+------------+-----------+--------------+----------------+
only showing top 10 rows

Using SQL with Spark

We can contiune using transformations such as select() and filter() but we can also use SQL.

Spark supports SQL queries which is very useful to quickly explore your data. It also makes it simple for new users to start working with Spark.

For example, we can select only the columns that we are interested in and filter out records with errors (status = 3 is a normal record).

To simplify the visualization, we will focus on a small part of the highway network which is E4 highway North direction. So we will filter that as well.

In [17]:
df_cleanup2.createOrReplaceTempView("FlowData")
In [18]:
df_E4N = spark.sql('SELECT Timestamp, Ds_Reference, Detector_Number, Flow_In, Average_Speed ' 
                  'FROM FlowData WHERE Status == 3 AND Ds_Reference.Road == "E4N"')
In [19]:
%%time 
df_E4N.count()
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 2.19 s
Out[19]:
537740
In [20]:
df_E4N.show(10)
+-------------------+------------+---------------+-------+-------------+
|          Timestamp|Ds_Reference|Detector_Number|Flow_In|Average_Speed|
+-------------------+------------+---------------+-------+-------------+
|2016-11-01 00:00:00| [E4N,48290]|              3|      8|           82|
|2016-11-01 00:00:00| [E4N,48935]|              3|      4|           72|
|2016-11-01 00:00:00| [E4N,49370]|              1|      1|           77|
|2016-11-01 00:00:00| [E4N,49370]|              2|      3|           78|
|2016-11-01 00:00:00| [E4N,49370]|              3|      6|           52|
|2016-11-01 00:00:00| [E4N,50165]|              1|      3|          110|
|2016-11-01 00:00:00| [E4N,50165]|              2|      3|           83|
|2016-11-01 00:00:00| [E4N,50165]|              3|      1|           72|
|2016-11-01 00:00:00| [E4N,50395]|              2|      3|           82|
|2016-11-01 00:00:00| [E4N,50395]|              3|      1|           71|
+-------------------+------------+---------------+-------+-------------+
only showing top 10 rows

Plotting

We can now start exploring the dataset, for example, by plotting histogram of a column. We will use Spark to do the calculation of the histogram buckets then use Pandas for plotting.

We will extract the column of interest (select()) and use histogram method on RDD. We use flatMap to convert from Row object to int value.

In [21]:
speed_histogram = df_E4N.select('Average_Speed').rdd.flatMap(lambda x: x).histogram(10)
In [22]:
speed_histogram
Out[22]:
([2.0,
  22.7,
  43.4,
  64.1,
  84.8,
  105.5,
  126.19999999999999,
  146.9,
  167.6,
  188.29999999999998,
  209],
 [22686, 31257, 44760, 204690, 201827, 31045, 1316, 132, 23, 4])

Then we use Pandas to plot the figure

In [23]:
pd.DataFrame(list(zip(list(speed_histogram)[0], list(speed_histogram)[1])), \
             columns=['Average Speed','Frequency']).set_index('Average Speed').plot(kind='bar')
Out[23]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fabb61d82e8>

Fundamental Diagram of Traffic Flow

Traffic Flow Diagram

We will plot the basic traffic flow diagrams for some of the detectors to get some insights into the properties of the traffic on the highway. In particular, the maximum flow and associated density. We will use that to select thresholds used in queue detection.

But first we need to calculate the density and add it as a new column to our dataset.

Adding Density Column

In [24]:
df_E4N_D = df_E4N.withColumn('Density', col('Flow_In')*60/col('Average_Speed'))
In [25]:
df_E4N_D.printSchema()
root
 |-- Timestamp: timestamp (nullable = true)
 |-- Ds_Reference: struct (nullable = true)
 |    |-- Road: string (nullable = false)
 |    |-- Km_Ref: integer (nullable = false)
 |-- Detector_Number: short (nullable = true)
 |-- Flow_In: short (nullable = true)
 |-- Average_Speed: short (nullable = true)
 |-- Density: double (nullable = true)

In [26]:
df_E4N_D.show(20)
+-------------------+------------+---------------+-------+-------------+------------------+
|          Timestamp|Ds_Reference|Detector_Number|Flow_In|Average_Speed|           Density|
+-------------------+------------+---------------+-------+-------------+------------------+
|2016-11-01 00:00:00| [E4N,48290]|              3|      8|           82| 5.853658536585366|
|2016-11-01 00:00:00| [E4N,48935]|              3|      4|           72|3.3333333333333335|
|2016-11-01 00:00:00| [E4N,49370]|              1|      1|           77|0.7792207792207793|
|2016-11-01 00:00:00| [E4N,49370]|              2|      3|           78|2.3076923076923075|
|2016-11-01 00:00:00| [E4N,49370]|              3|      6|           52| 6.923076923076923|
|2016-11-01 00:00:00| [E4N,50165]|              1|      3|          110|1.6363636363636365|
|2016-11-01 00:00:00| [E4N,50165]|              2|      3|           83|2.1686746987951806|
|2016-11-01 00:00:00| [E4N,50165]|              3|      1|           72|0.8333333333333334|
|2016-11-01 00:00:00| [E4N,50395]|              2|      3|           82|2.1951219512195124|
|2016-11-01 00:00:00| [E4N,50395]|              3|      1|           71|0.8450704225352113|
|2016-11-01 00:00:00| [E4N,50570]|              2|      2|           80|               1.5|
|2016-11-01 00:00:00| [E4N,50570]|              3|      6|           80|               4.5|
|2016-11-01 00:00:00| [E4N,50890]|              2|      1|           89|0.6741573033707865|
|2016-11-01 00:00:00| [E4N,50890]|              3|      3|           79| 2.278481012658228|
|2016-11-01 00:00:00| [E4N,51085]|              2|      3|           66| 2.727272727272727|
|2016-11-01 00:00:00| [E4N,51370]|              2|      3|           75|               2.4|
|2016-11-01 00:00:00| [E4N,51630]|              2|      1|           89|0.6741573033707865|
|2016-11-01 00:00:00| [E4N,51895]|              1|      1|           79| 0.759493670886076|
|2016-11-01 00:00:00| [E4N,51895]|              2|      1|           85|0.7058823529411765|
|2016-11-01 00:00:00| [E4N,52220]|              2|      2|           34|3.5294117647058822|
+-------------------+------------+---------------+-------+-------------+------------------+
only showing top 20 rows

Plotting Traffic Flow Diagrams

We select one detector and plot its values. In this example we use detector 'E4N 26,050' in the right most lane.

In [27]:
df = df_E4N_D.filter((col('Ds_Reference.Road') == 'E4N') & \
                     (col('Ds_Reference.Km_Ref') == 26050) & \
                     (col('Detector_Number') == 1))
In [28]:
df.count()
Out[28]:
1157
In [29]:
pdf = df.toPandas()
In [30]:
pdf.plot.scatter(x = 'Density', y = 'Average_Speed', c= pdf['Timestamp'].dt.hour, colormap='Blues', colorbar=False)
Out[30]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fabac794eb8>
In [31]:
pdf.plot.scatter(x = 'Average_Speed', y = 'Flow_In', c= pdf['Timestamp'].dt.hour, colormap='Blues', colorbar=False )
Out[31]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fabac773240>
In [32]:
pdf.plot.scatter(x = 'Density', y = 'Flow_In', c= pdf['Timestamp'].dt.hour, legend=True, colormap='Blues', colorbar=False )
Out[32]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fabac7332e8>

We plot for another detector. This one lies in a more congested area.

In [33]:
pdf2 = df_E4N_D.filter((col('Ds_Reference.Road') == 'E4N') & \
                     (col('Ds_Reference.Km_Ref') == 68960) & \
                     (col('Detector_Number') == 1)) \
                     .toPandas()
In [34]:
pdf2.plot.scatter(x = 'Density', y = 'Average_Speed', c= pdf2['Timestamp'].dt.hour, colormap='Blues', colorbar=False)
Out[34]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fabac55ec18>
In [35]:
pdf2.plot.scatter(x = 'Average_Speed', y = 'Flow_In', c= pdf2['Timestamp'].dt.hour, colormap='Blues', colorbar=False)
Out[35]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fabac52df98>
In [36]:
pdf2.plot.scatter(x = 'Density', y = 'Flow_In', c= pdf2['Timestamp'].dt.hour, colormap='Blues', colorbar=False)
Out[36]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fabac453390>

End of Queue Detection

Queue Sign

To detect the end of queues we will use the density field we calculated. The end of the queue should be close to the detector that experienced a sudden increase in density compared to the density in the previous minute. The end of the queue considered an accident hazard in highways.

To calculate the difference in density we will use the Window function in Spark. A window partitions and sorts the data into groups. Then we can run a computation on the records in these groups (windows). In our example, we will partition the data by the detector unique ID (Ds_Reference + Detector_Number) so we will have all the data for one detector groupd in a window then we sort it by the time stamp.

In [37]:
w = Window.partitionBy('Ds_Reference', 'Detector_Number').orderBy('Timestamp')

Then we define two functions over these windows to calculate the dinsity difference and time difference. We use lag() to access the previous record in the sorted window.

In [38]:
densDiff = col('Density')- lag('Density', 1).over(w)
timeFmt = 'yyyy-MM-dd HH:mm:ss.SSS'
timeDiff = unix_timestamp('Timestamp', format=timeFmt) - lag(unix_timestamp('Timestamp', format=timeFmt)).over(w)

We use these functions to create two new columns in our dataset.

In [39]:
df_diff = df_E4N_D.withColumn('Density_Diff', densDiff).withColumn('timeDiff', timeDiff)
In [40]:
df_diff.show(20)
+-------------------+------------+---------------+-------+-------------+------------------+--------------------+--------+
|          Timestamp|Ds_Reference|Detector_Number|Flow_In|Average_Speed|           Density|        Density_Diff|timeDiff|
+-------------------+------------+---------------+-------+-------------+------------------+--------------------+--------+
|2016-11-01 00:00:00| [E4N,30710]|              2|      1|           89|0.6741573033707865|                null|    null|
|2016-11-01 00:01:00| [E4N,30710]|              2|      1|          108|0.5555555555555556|-0.11860174781523092|      60|
|2016-11-01 00:02:00| [E4N,30710]|              2|      2|          105|1.1428571428571428|  0.5873015873015872|      60|
|2016-11-01 00:04:00| [E4N,30710]|              2|      2|           96|              1.25|  0.1071428571428572|     120|
|2016-11-01 00:05:00| [E4N,30710]|              2|      3|          103|1.7475728155339805|  0.4975728155339805|      60|
|2016-11-01 00:06:00| [E4N,30710]|              2|      1|           83|0.7228915662650602| -1.0246812492689203|      60|
|2016-11-01 00:07:00| [E4N,30710]|              2|      2|           99|1.2121212121212122|  0.4892296458561519|      60|
|2016-11-01 00:09:00| [E4N,30710]|              2|      1|          100|               0.6| -0.6121212121212122|     120|
|2016-11-01 00:10:00| [E4N,30710]|              2|      1|          100|               0.6|                 0.0|      60|
|2016-11-01 00:12:00| [E4N,30710]|              2|      3|          107|1.6822429906542056|  1.0822429906542057|     120|
|2016-11-01 00:13:00| [E4N,30710]|              2|      1|           88|0.6818181818181818| -1.0004248088360237|      60|
|2016-11-01 00:14:00| [E4N,30710]|              2|      2|          127|0.9448818897637795| 0.26306370794559775|      60|
|2016-11-01 00:16:00| [E4N,30710]|              2|      3|          104|1.7307692307692308|  0.7858873410054513|     120|
|2016-11-01 00:17:00| [E4N,30710]|              2|      1|           88|0.6818181818181818| -1.0489510489510492|      60|
|2016-11-01 00:18:00| [E4N,30710]|              2|      1|           96|             0.625|-0.05681818181818177|      60|
|2016-11-01 00:19:00| [E4N,30710]|              2|      2|          138|0.8695652173913043| 0.24456521739130432|      60|
|2016-11-01 00:20:00| [E4N,30710]|              2|      1|           99|0.6060606060606061|-0.26350461133069825|      60|
|2016-11-01 00:21:00| [E4N,30710]|              2|      2|          108|1.1111111111111112|  0.5050505050505051|      60|
|2016-11-01 00:22:00| [E4N,30710]|              2|      4|          100|               2.4|  1.2888888888888888|      60|
|2016-11-01 00:25:00| [E4N,30710]|              2|      1|           78|0.7692307692307693| -1.6307692307692307|     180|
+-------------------+------------+---------------+-------+-------------+------------------+--------------------+--------+
only showing top 20 rows

Because we filtered the original dataset to contain only normal records (we removed records with error codes such as faulty detector or empty highway), some of the records in the window we defined might be more than a minute apart. We filter these records out using the timeDiff field we calculated.

In [41]:
df_diff_min = df_diff.filter(col('timeDiff') == 60)
In [42]:
df_diff_min.show(20)
+-------------------+------------+---------------+-------+-------------+------------------+--------------------+--------+
|          Timestamp|Ds_Reference|Detector_Number|Flow_In|Average_Speed|           Density|        Density_Diff|timeDiff|
+-------------------+------------+---------------+-------+-------------+------------------+--------------------+--------+
|2016-11-01 00:01:00| [E4N,30710]|              2|      1|          108|0.5555555555555556|-0.11860174781523092|      60|
|2016-11-01 00:02:00| [E4N,30710]|              2|      2|          105|1.1428571428571428|  0.5873015873015872|      60|
|2016-11-01 00:05:00| [E4N,30710]|              2|      3|          103|1.7475728155339805|  0.4975728155339805|      60|
|2016-11-01 00:06:00| [E4N,30710]|              2|      1|           83|0.7228915662650602| -1.0246812492689203|      60|
|2016-11-01 00:07:00| [E4N,30710]|              2|      2|           99|1.2121212121212122|  0.4892296458561519|      60|
|2016-11-01 00:10:00| [E4N,30710]|              2|      1|          100|               0.6|                 0.0|      60|
|2016-11-01 00:13:00| [E4N,30710]|              2|      1|           88|0.6818181818181818| -1.0004248088360237|      60|
|2016-11-01 00:14:00| [E4N,30710]|              2|      2|          127|0.9448818897637795| 0.26306370794559775|      60|
|2016-11-01 00:17:00| [E4N,30710]|              2|      1|           88|0.6818181818181818| -1.0489510489510492|      60|
|2016-11-01 00:18:00| [E4N,30710]|              2|      1|           96|             0.625|-0.05681818181818177|      60|
|2016-11-01 00:19:00| [E4N,30710]|              2|      2|          138|0.8695652173913043| 0.24456521739130432|      60|
|2016-11-01 00:20:00| [E4N,30710]|              2|      1|           99|0.6060606060606061|-0.26350461133069825|      60|
|2016-11-01 00:21:00| [E4N,30710]|              2|      2|          108|1.1111111111111112|  0.5050505050505051|      60|
|2016-11-01 00:22:00| [E4N,30710]|              2|      4|          100|               2.4|  1.2888888888888888|      60|
|2016-11-01 00:32:00| [E4N,30710]|              2|      3|          112|1.6071428571428572|  0.1555299539170507|      60|
|2016-11-01 00:36:00| [E4N,30710]|              2|      2|           92|1.3043478260869565|  0.1505016722408028|      60|
|2016-11-01 00:42:00| [E4N,30710]|              2|      1|           94|0.6382978723404256| 0.13409619166815667|      60|
|2016-11-01 00:43:00| [E4N,30710]|              2|      1|           87|0.6896551724137931|0.051357300073367584|      60|
|2016-11-01 00:44:00| [E4N,30710]|              2|      2|          106|1.1320754716981132| 0.44242029928432003|      60|
|2016-11-01 00:47:00| [E4N,30710]|              2|      2|          109|1.1009174311926606| 0.46261955885223505|      60|
+-------------------+------------+---------------+-------+-------------+------------------+--------------------+--------+
only showing top 20 rows

Now that we calculated the density difference, all what is left is to find the detectors that cross certain thresholds that we define.

In [43]:
df_queue = df_diff_min.filter( (col('Density') > 30) & (col('Density_Diff') > 20) )
In [44]:
df_queue.count()
Out[44]:
5470
In [45]:
df_queue.show(20)
+-------------------+------------+---------------+-------+-------------+------------------+------------------+--------+
|          Timestamp|Ds_Reference|Detector_Number|Flow_In|Average_Speed|           Density|      Density_Diff|timeDiff|
+-------------------+------------+---------------+-------+-------------+------------------+------------------+--------+
|2016-11-01 05:18:00| [E4N,54630]|              3|     16|           11| 87.27272727272727|  50.2139037433155|      60|
|2016-11-01 05:20:00| [E4N,54630]|              3|     20|           17| 70.58823529411765| 21.49732620320856|      60|
|2016-11-01 05:25:00| [E4N,54630]|              3|     21|           18|              70.0|              43.0|      60|
|2016-11-01 05:35:00| [E4N,54630]|              3|     22|           24|              55.0| 23.96551724137931|      60|
|2016-11-01 05:38:00| [E4N,54630]|              3|     20|           12|             100.0| 53.63636363636363|      60|
|2016-11-01 05:57:00| [E4N,54630]|              3|     25|           34| 44.11764705882353|23.629842180774748|      60|
|2016-11-01 06:04:00| [E4N,54630]|              3|     15|           10|              90.0| 42.41379310344828|      60|
|2016-11-01 06:21:00| [E4N,54630]|              3|     20|           13|  92.3076923076923|57.468982630272954|      60|
|2016-11-01 06:37:00| [E4N,54630]|              3|     23|           25|              55.2|21.450000000000003|      60|
|2016-11-01 07:06:00| [E4N,54630]|              3|     19|           20|              57.0|             23.25|      60|
|2016-11-01 07:34:00| [E4N,54630]|              3|     17|           16|             63.75|             36.25|      60|
|2016-11-01 07:38:00| [E4N,54630]|              3|     24|           25|              57.6|            21.975|      60|
|2016-11-01 07:45:00| [E4N,54630]|              3|     16|           10|              96.0|              56.0|      60|
|2016-11-01 07:56:00| [E4N,54630]|              3|     14|            6|             140.0|              92.0|      60|
|2016-11-01 08:28:00| [E4N,54630]|              3|     17|            9|113.33333333333333| 67.61904761904762|      60|
|2016-11-01 08:36:00| [E4N,54630]|              3|     15|           16|             56.25|23.942307692307693|      60|
|2016-11-01 08:37:00| [E4N,54630]|              3|     19|           10|             114.0|             57.75|      60|
|2016-11-01 08:43:00| [E4N,54630]|              3|     22|           20|              66.0|              21.0|      60|
|2016-11-01 08:47:00| [E4N,54630]|              3|     21|           26| 48.46153846153846|21.032967032967033|      60|
|2016-11-01 08:54:00| [E4N,54630]|              3|     20|           26| 46.15384615384615|23.653846153846153|      60|
+-------------------+------------+---------------+-------+-------------+------------------+------------------+--------+
only showing top 20 rows

3 - Visualization

Now that we have found the queue ends (in df_queue DataFrame), it is useful to visualize them on a Map and see them in action. We will use a map widget from ipyleaflet to plot the Geo locations and use a Text Box to show the time.

In [46]:
# Uncomment the map you prefer from the ones below, or add your prefered map source
# Comment all and remove default_tiles to use the default map
#url = 'https://{s}.tile.openstreetmap.fr/hot/{z}/{x}/{y}.png'
url = 'http://a.tile.basemaps.cartocdn.com/light_all/{z}/{x}/{y}.png'
#url = 'http://{s}.tile.stamen.com/toner/{z}/{x}/{y}.png'

provider = TileLayer(url=url, opacity=1)
myMap =Map(default_tiles=provider, center=[59.334591, 18.063240], zoom=12, layout=widgets.Layout(width='130%', height='1000px'))
myMap.layout.height = '600px'
myMap.layout.width = '100%'
In [47]:
myMap
In [48]:
wgt = widgets.Text(value='Timestamp!', disabled=True)
display(wgt)

Detector Metadata

To plot the data on the map we need the GPS coordinates of the detector. This metadata exists in a separate geographic information system (GIS). We exported the data into a csv file and converted the coordinates from the Swedish SWEREF99 standard to the GPS WGS84 standard for easier plotting on the map.

Then we load the metadata into Spark.

In [49]:
schema_metadata = StructType() \
        .add('Y', DoubleType(), False) \
        .add('X', DoubleType(), False) \
        .add('DetectorId', ShortType(), False) \
        .add('McsDetecto', ShortType(), False) \
        .add('McsDsRefer', StringType(), False) \
        .add('LaneId', ShortType(), False) \
        .add('Bearing', ShortType(), True) \
        .add('Location', StringType(), True) \
        .add('RegionId', ShortType(), False) \
        .add('Entreprene', StringType(), True) \
        .add('StationId', ShortType(), False) \
        .add('SiteId', ShortType(), False) \
        .add('SiteValidF', TimestampType(), False) \
        .add('SiteValidT', TimestampType(), False) \
        .add('DetectorVa', TimestampType(), False) \
        .add('Detector_1', TimestampType(), False)
In [50]:
df_metadata_raw = spark.read.csv('data/sample_E4N_Metadata.csv', sep=';', schema=schema_metadata, \
                    ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, \
                    header=True, \
                    timestampFormat='yyyy/MM/dd HH:mm:ss.SSS')

We split the Ds_Reference like we did with the flow data

In [51]:
df_metadata = df_metadata_raw.withColumn('McsDsRefer', split_ds_ref(col('McsDsRefer')))
df_metadata.printSchema()
root
 |-- Y: double (nullable = true)
 |-- X: double (nullable = true)
 |-- DetectorId: short (nullable = true)
 |-- McsDetecto: short (nullable = true)
 |-- McsDsRefer: struct (nullable = true)
 |    |-- Road: string (nullable = false)
 |    |-- Km_Ref: integer (nullable = false)
 |-- LaneId: short (nullable = true)
 |-- Bearing: short (nullable = true)
 |-- Location: string (nullable = true)
 |-- RegionId: short (nullable = true)
 |-- Entreprene: string (nullable = true)
 |-- StationId: short (nullable = true)
 |-- SiteId: short (nullable = true)
 |-- SiteValidF: timestamp (nullable = true)
 |-- SiteValidT: timestamp (nullable = true)
 |-- DetectorVa: timestamp (nullable = true)
 |-- Detector_1: timestamp (nullable = true)

Plotting the Detectors on the Map

To verify the metada we will plot all detectors on the map. We extract the coordinates from the Spark DataFrame into a local Python list object then plot them.

In [52]:
coord = df_metadata.select('Y','X').collect()

We convert the coordinates into a GeoJson polygon that can be plotted by the map (small triangles)

In [53]:
features = []
for pos in coord:
    p = MultiPolygon([ \
                      ([(pos['X'], pos['Y']), \
                        (pos['X']+0.000001, pos['Y']+0.000001), \
                        (pos['X']-0.000001, pos['Y']+0.000001), \
                        (pos['X'], pos['Y'])],) \
                     ])

    features.append(Feature(geometry=p, \
                    properties={'style':{'color': '#0000cd', 'fillColor': '#0000cd', 'fillOpacity': 1.0, 'weight': 6}}))

data = FeatureCollection(features)
g = GeoJSON(data=data)
    

We can check one of the points

In [54]:
data['features'][2]
Out[54]:
{"geometry": {"coordinates": [[[[17.6419855357688, 59.179808900665], [17.6419865357688, 59.179809900665], [17.6419845357688, 59.179809900665], [17.6419855357688, 59.179808900665]]]], "type": "MultiPolygon"}, "properties": {"style": {"color": "#0000cd", "fillColor": "#0000cd", "fillOpacity": 1.0, "weight": 6}}, "type": "Feature"}

And verify that the GeoJson we created is valid

In [55]:
data.is_valid
Out[55]:
True

Add the data to the map. Scroll up and you should see the locations of the detectors plotted on the map.

In [56]:
myMap.add_layer(g)

When you are done. Remov the detectors layer from the map to clean it up

In [57]:
myMap.remove_layer(g)

Add Coordinates to Queue data

Now that we have the coordinates of the detectors, we need to add them to the queues we found so we can plot them. For that we use the join operation.

In [58]:
df_queue_coord = df_queue.alias('a').join(df_metadata.alias('b'), (col('a.Ds_Reference') == col('b.McsDsRefer')) \
                                           & (col('a.Detector_Number') == col('b.LaneId')))
In [59]:
df_queue_coord.printSchema()
root
 |-- Timestamp: timestamp (nullable = true)
 |-- Ds_Reference: struct (nullable = true)
 |    |-- Road: string (nullable = false)
 |    |-- Km_Ref: integer (nullable = false)
 |-- Detector_Number: short (nullable = true)
 |-- Flow_In: short (nullable = true)
 |-- Average_Speed: short (nullable = true)
 |-- Density: double (nullable = true)
 |-- Density_Diff: double (nullable = true)
 |-- timeDiff: long (nullable = true)
 |-- Y: double (nullable = true)
 |-- X: double (nullable = true)
 |-- DetectorId: short (nullable = true)
 |-- McsDetecto: short (nullable = true)
 |-- McsDsRefer: struct (nullable = true)
 |    |-- Road: string (nullable = false)
 |    |-- Km_Ref: integer (nullable = false)
 |-- LaneId: short (nullable = true)
 |-- Bearing: short (nullable = true)
 |-- Location: string (nullable = true)
 |-- RegionId: short (nullable = true)
 |-- Entreprene: string (nullable = true)
 |-- StationId: short (nullable = true)
 |-- SiteId: short (nullable = true)
 |-- SiteValidF: timestamp (nullable = true)
 |-- SiteValidT: timestamp (nullable = true)
 |-- DetectorVa: timestamp (nullable = true)
 |-- Detector_1: timestamp (nullable = true)

Visualize Queue Locations

We extract the data we need from the joined table in Spark and get it locally into a Pandas DataFrame then loop through it and plot the queue ends we found on the map and show the time in the text box

In [60]:
pdf_queue_coord = df_queue_coord.select('Timestamp', 'X', 'Y', 'Density').toPandas()

We can use the time stamp to get the queues at a specific time and the gps coordinates to plot them. For example:

In [61]:
pdf_queue_coord[pdf_queue_coord['Timestamp'] == datetime(2016,11,1,6,57,0)]
Out[61]:
Timestamp X Y Density
890 2016-11-01 06:57:00 18.004614 59.296446 74.117647
1002 2016-11-01 06:57:00 18.018993 59.304314 110.000000
1453 2016-11-01 06:57:00 17.992287 59.290003 92.000000
1725 2016-11-01 06:57:00 18.004565 59.296465 77.142857
2140 2016-11-01 06:57:00 18.006407 59.297673 112.500000
2414 2016-11-01 06:57:00 18.004516 59.296484 52.500000
2581 2016-11-01 06:57:00 18.010011 59.329998 66.000000
4734 2016-11-01 06:57:00 18.010280 59.307919 85.000000

We define some helper functions that we will use in plotting.

Remove all layers on the map except the map tiles.

In [62]:
def map_cleanup(m):
    for l in m.layers:
        if type(l) != TileLayer:
            m.remove_layer(l)
            l.close()

Get all layers except the map tiles.

In [63]:
def map_get_layers(m):
    layers = []
    for l in m.layers:
        if type(l) != TileLayer:
            layers.append(l)
    return layers

Remove a given list of layers.

In [64]:
def map_cleanup_Layers(m, layers):
    for l in layers:
        m.remove_layer(l)
        l.close()

Plots coordinates in a Pandas DataFrame on the map.

In [65]:
def plot_pandas(pdf, m, color='#191970', opacity=0.5):
    features = []
    for index, row in pdf.iterrows():
        p = MultiPolygon([ \
                        ([(row['X'], row['Y']), \
                        (row['X']+0.0005, row['Y']+0.0005), \
                        (row['X']-0.0005, row['Y']+0.0005), \
                        (row['X'], row['Y'])],) \
                        ])

        features.append(Feature(geometry=p, \
                    properties={'style':{'color': color, 'fillColor': color, 'fillOpacity':opacity, 'weight': 5}}))

    data = FeatureCollection(features)
    g = GeoJSON(data=data)
    m.add_layer(g)
    

We loop through the data minute by minute from start time to end time. To improve the visualization, we plot the current minute in dark blue as well as the previous 3 minuts in lighter blue shades.

In [73]:
# Start Time
st = datetime(2016,11,1,7,0,0)

# End Time
et = datetime(2016,11,1,23,59,59)

# Time Delta
td = timedelta(minutes=1)

t1 = st
t2 = t1 + td
t3 = t2 + td
t4 = t3 + td

while t4 <= et:
    l = map_get_layers(myMap)
    plot_pandas(pdf_queue_coord[pdf_queue_coord['Timestamp'] == t1], myMap, '#e0ffff', opacity=0.7)
    plot_pandas(pdf_queue_coord[pdf_queue_coord['Timestamp'] == t2], myMap, '#b0c4de', opacity=0.8)
    plot_pandas(pdf_queue_coord[pdf_queue_coord['Timestamp'] == t3], myMap, '#6495ed', opacity=0.9)
    plot_pandas(pdf_queue_coord[pdf_queue_coord['Timestamp'] == t4], myMap, '#000080', opacity=1.0)
    
    # Update time of the Text Box
    wgt.value = t4.ctime()
    
    # You may break here to view a snapshot at a specific time instead of animation. Set st to the time you want to check
    break
    
    map_cleanup_Layers(myMap, l)
    # Change the animation speed
    time.sleep(0.25)
    t1 = t1 + td
    t2 = t2 + td
    t3 = t3 + td
    t4 = t4 + td
    

Cleanup the map

In [67]:
map_cleanup(myMap)

Next Steps

1) Parquet Data Format vs. Original CSV

Parquet files are much more efficient than CSV files because they store data in binary and columnar format. Read this for example.

You can test this yourself. We save our cleaned up data in parquet format then use it to load our E4N data. Compare the time!

In [68]:
%%time
df_cleanup2.write.save('data/sample_E4N.parquet', format='parquet')
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 3.29 s
In [69]:
%%time
df_E4N_test = spark.read.parquet('data/sample_E4N.parquet').select('Timestamp', 'Ds_Reference', 'Detector_Number', 'Flow_In', 'Average_Speed').where('Status == 3 AND Ds_Reference.Road == "E4N"')
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 190 ms
In [70]:
%%time
df_E4N_test.count()
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 666 ms
Out[70]:
537740
In [71]:
%%time
df_E4N_test.show(10)
+-------------------+------------+---------------+-------+-------------+
|          Timestamp|Ds_Reference|Detector_Number|Flow_In|Average_Speed|
+-------------------+------------+---------------+-------+-------------+
|2016-11-01 19:46:00| [E4N,57195]|              1|     17|           71|
|2016-11-01 19:46:00| [E4N,57195]|              2|     14|           56|
|2016-11-01 19:46:00| [E4N,57305]|              1|     17|           65|
|2016-11-01 19:46:00| [E4N,57305]|              2|     16|           56|
|2016-11-01 19:46:00| [E4N,57420]|              1|     15|           66|
|2016-11-01 19:46:00| [E4N,57420]|              2|     14|           57|
|2016-11-01 19:46:00| [E4N,57730]|              1|      9|           81|
|2016-11-01 19:46:00| [E4N,57730]|              2|     12|           65|
|2016-11-01 19:46:00| [E4N,58010]|              1|      7|           73|
|2016-11-01 19:46:00| [E4N,58010]|              2|     16|           61|
+-------------------+------------+---------------+-------+-------------+
only showing top 10 rows

CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 87.9 ms
In [72]:
df_E4N_test.printSchema()
root
 |-- Timestamp: timestamp (nullable = true)
 |-- Ds_Reference: struct (nullable = true)
 |    |-- Road: string (nullable = true)
 |    |-- Km_Ref: integer (nullable = true)
 |-- Detector_Number: short (nullable = true)
 |-- Flow_In: short (nullable = true)
 |-- Average_Speed: short (nullable = true)

Check also the data size on disk. In my case the csv sample data was 40MB while the Parquet file we saved was 1.4MB!

2) Stream Processing Using Kafka and Spark Structured Streaming

The above method for queue detection can be easily converted into a streaming program using the new Spark Structured Streaming. For further reading on that, see this tutorial to get started.

3) Graph Processing

Knowing the topological relationship between detectors is needed to do more complicated analytics. For example, knowing next/previous detector on the same lane, knowing detectors before/after lane split/merge.

One way to achieve this is by constructing a graph linking detectors together. Having this graph, we can use graph processing to do more advanced analytics. For example, we can detect queue ends better by looking at multiple consecutive sensors on the same lane.

In [ ]: