Vehicular interaction and movement for simple traffic configurations

  • We are interested in the dynamics between different types of vehicles on a grid.
  • Particularly, we would like to investigate the interaction between the speeds and general movement of cars (fast vehicles) and bicyles (slower vehicles).
  • Our research question: can increased volume of bicycles (slower vehicles) in a traffic system increase its total throughput?

Model design

Here we describe the design of our system and the assumptions made on its outset. In general, our model contains both deterministic rules and stochastic processes.


Assumptions

In general we take for granted the general assumptions of cellular automata:

  • Topology is a flat, 2-dimensional grid,
    • Time is discrete in nature and nothing interesting happens between timesteps,

Our model specific assumptions are as follows:

  1. There are no collisions. We expect drivers to be responsible and always make choices to slow down and wait their turn at intersections.
  2. Destination is irrelevant. We are not concerned with where vehicles start/exit the system. Hence, decisions with regard to how a vehicles move at junction points are handled with randomness.
  3. Cells can take on 5 separate states:
    • Car $(C)$: A fast vehicle that moves each timestep if the rules allow it.
    • Bicycle $(B)$: A slow vehicle that moves every $b$ timesteps relative to the time it entered the system.
    • Empty $(E)$: Grass/division between roads. Cells marked $(E)$ stay that way for the entire simulation.
    • Road $(R)$: An empty cell that a vehicle can occupy.
    • Intersection $(I)$: A 4-way stop sign at which vehicles must stop. Intersections act in a first in, first out manner meaning each vehicle must wait its turn to move. Bicycles $(B)$ may move at its turn regardless of whether or not the it is a $b$th timestep.
  4. Passing is allowable. Cars may pass bicycles if they have a specific neighborhood, described below.

Rules

Our rules are fairly involved. This is because vehicles can pass eachother. In addition, there is a special case for intersection nodes on the system.

Final Output

Example with the bicycle density roughly of Davis, CA

<img src="traffic_model-b2c-0166.gif",width=900,height=600>

In [8]:
type2name = {"I":-1, "E":0, "R":1, "C":2}
In [9]:
# necessary imports
import numpy as np
import random
import sys, os
import imageio
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib import colors
from matplotlib import rc
from glob import glob
import numpy as np
In [10]:
# necessary for model but not pertinent to its description
coin_flip = lambda p: random.random() < p # flip a biased coin with prob. p of turning up heads
place = "figs/traffic_gif/"
In [ ]:
class Intersection:
    def __init__(self, coordinates):
        """
        The logical description of a 4 way intersection, aka submatrices
        in the grid of the form:
        
                [0][1][1][0]
                [1][1][1][1]
                [1][1][1][1]
                [0][1][1][0]
        
        args:
            Intersections accept a single constructor parameter, coordinates.
            This is a coordinate tuple (i,j) that represents the coordinate of the
            top left corner of the above matrix.
        
        attributes:
            From the coordinates parameter, we infer 2 attributes: i
                1) incoming directions from the north, south, east, and west directions.
                2) outgoing directions  to  the north, south, east, and west
            
            Intersections have a final attribute called `queue`, this is the FIFO structure
            that 
        """
        i, j = self.i, self.j = coordinates

        # compute the neighborhood of coordinates
        # for this intersection
        self.nsew = {"N":(i,   j+1),
                     "S":(i+3, j+2),
                     "E":(i+1, j+3),
                     "W":(i+2, j)}

        self.potential_locs = {"N":[(i+1,j),
                                    (i+3,j+1),
                                    (i+2,j+3)],
                               "S":[(i+1,j),
                                    (i+2,j+3),
                                    (i,  j+2)],
                               "E":[(i+1,j),
                                    (i,  j+2),
                                    (i+3,j+1)],
                               "W":[(i,  j+2),
                                    (i+2,j+3),
                                    (i+3,j+1)]}
        self.queue = []

    def __str__(self):
        return "Intersection @ ({},{})".format(self.i,self.j)

    def __repr__(self):
        return str(self)

def place_intersections(G):
    """
    - Place intersections on the grid G.
    - Intersections are placed at any place 4x4 subsection of G of the following forms:

        [0][1][1][0]  (1)
        [1][1][1][1]
        [1][1][1][1]
        [0][1][1][0],

    The function for now only support intersections of type (1).

    args:
        :NxN grid G
    returns:
        :the intersection placed grid
    """
    intersection_regime = np.array([[0,1,1,0],
                               [1,1,1,1],
                               [1,1,1,1],
                               [0,1,1,0]])
    intersection_i, intersection_j = np.where(find_regime(intersection_regime, G) == 1)
    intersections = []
    for i,j in zip(intersection_i, intersection_j):
        G[i+1,j+1] = -1
        G[i+2,j+2] = -1
        G[i+1,j+2] = -1
        G[i+2,j+1] = -1
        intersections.append(Intersection((i,j)))
        #Is = [i+1, i+2, i+1, i+2]
        #Js = [j+1, j+2, j+1, j+2]
        #G[(Is,Js)] = -1

    return intersections, G


def find_regime(submatrix, grid):
    """
    convolve throughout grid and return masks of indices that describe matches in
    the grid to the specified submatrix

    args:
        :submatrix -   2d array of dimensions n x m
        :grid      -   2d array symbolizing the world of dimensions N x M
    returns:
        :masks that describe matches of the regime in the grid
    """
    n, m = submatrix.shape
    N, M = grid.shape
    convolve_mat = np.zeros((N-n+1, M-m+1))

    for row in range(N - n + 1):
        for col in range(M - m + 1):
            view = grid[row:row + n, col:col + m]
            if np.array_equal(view, submatrix):
                convolve_mat[row, col] = 1
            else:
                convolve_mat[row, col] = 0

    return convolve_mat


class Passing_regime:
    def __init__(self, direction, coordinates):
        i, j = self.i, self.j = coordinates

        if direction == 'Ebound':
            self.regime = np.array([[1,1,1,1],
                                  [2,3,1,1]])
            self.old_loc = (i+1, j)

            self.new_loc = (i+1, j+3)

        elif direction == 'Wbound':
            self.regime = np.array([[1,1,3,2],
                                  [1,1,1,1]])
            self.old_loc = (i,   j+3)

            self.new_loc = (i,   j)

        elif direction == 'Nbound':
            self.regime = np.array([[1,1],
                                  [1,1],
                                  [1,3],
                                  [1,2]])
            self.old_loc = (i+3,  j+1)

            self.new_loc = (i,   j+1)

        elif direction == 'Sbound':
            self.regime = np.array([[2,1],
                                  [3,1],
                                  [1,1],
                                  [1,1]])
            self.old_loc = (i,   j)

            self.new_loc = (i+3, j)

    def __str__(self):
        return "Car can pass bike @ ({},{})".format(self.old_loc[0],self.old_loc[1])

    def __repr__(self):
        return str(self)



def lay_roads(N_traffic, W_traffic, G):
    """
    args:
        N_traffic: (X length list) of x coords to roads
        W_traffic: (Y length list) of y coords to roads
    returns:
        :(dict) maps from direction in {N,S,E,W} to
         traffic incoming coordinates from N,S,E,W respectively
    """
    x,y = G.shape
    S_traffic = [i+1 for i in N_traffic]
    E_traffic = [j-1 for j in W_traffic]

    # laying roads
    for i1,i2 in zip(N_traffic,S_traffic):
        G[:,i1] = 1
        G[:,i2] = 1

    for j1,j2 in zip(W_traffic,E_traffic):
        G[j1,:] = 1
        G[j2,:] = 1

    return {"N":[(0, j) for j in N_traffic],"W":[(i, 0) for i in W_traffic],
            "S":[(x-1,j) for j in S_traffic],"E":[(i,y-1) for i in E_traffic]}

def move_vehicle(type2name, vtype, vi, vj, shadow, G):
    M, N = G.shape
    northmost = (vi == 0 and shadow[vi, vj+1] == type2name["E"])
    southmost = (vi == N - 1 and shadow[vi, vj-1] == type2name["E"])
    eastmost = (vj == M - 1 and shadow[vi+1, vj] == type2name["E"])
    westmost = (vj == 0 and shadow[vi - 1, vj] == type2name["E"])
    if northmost or southmost or eastmost or westmost:
        G[vi,vj] = type2name["R"]
        return {'veh_move':True, 'veh_exit':True}

    # E V
    #   R
    try:
        if shadow[vi+1, vj] == type2name["R"] and shadow[vi, vj -1] == type2name["E"]:
            G[vi,vj] = type2name["R"]
            G[vi+1, vj] = type2name[vtype]
            return {'veh_move':True, 'veh_exit':False}
    except IndexError:
        pass

    # R
    # V E
    try:
        if shadow[vi-1, vj] == type2name["R"] and shadow[vi, vj+1] == type2name["E"]:
            G[vi,vj] = type2name["R"]
            G[vi-1, vj] = type2name[vtype]
            return {'veh_move':True, 'veh_exit':False}
    except IndexError:
        pass
    # V R
    # E
    try:
        if shadow[vi,vj+1] == type2name["R"] and shadow[vi+1,vj] == type2name["E"]:
            G[vi,vj] = type2name["R"]
            G[vi, vj+1] = type2name[vtype]
            return {'veh_move':True, 'veh_exit':False}
    except IndexError:
        pass
    # E
    # R V
    try:
        if shadow[vi,vj-1] == type2name["R"] and shadow[vi-1, vj] == type2name["E"]:
            G[vi,vj] = type2name["R"]
            G[vi, vj-1] = type2name[vtype]
            return {'veh_move':True, 'veh_exit':False}
    except IndexError:
        pass


def cars_pass_bikes(type2name,b_rate,shadow,G):
    '''
    perform a regime search for cars that could pass bicycles
    args: shadow: current state of the system
          G     : next state of the system

    Bike Passing Regimes:

     R R R R  eastbound  -> move C from (i+1, j  ) to (i+1, j+3)
     C B R R

     R R B C  westbound  -> move C from (i  , j+3) to (i  , j  )
     R R R R

     R R      northbound -> move C from (i+3, j+1) to (i  , j+1)
     R R
     R B
     R C

     C R      southbound -> move C from (i  , j  ) to (i+3, j  )
     B R
     R R
     R R
    '''

    bikepass_regimes = {'Ebound':[np.array([[1,1,1,1],
                                           [2,3+i,1,1]]) for i in range(b_rate)],
                        'Wbound':[np.array([[1,1,3+i,2],
                                            [1,1,1,1]]) for i in range(b_rate)],
                        'Nbound':[np.array([[1,1],
                                            [1,1],
                                            [1,3+i],
                                            [1,2]]) for i in range(b_rate)],
                        'Sbound':[np.array([[2,1],
                                           [3+i,1],
                                           [1,1],
                                           [1,1]]) for i in range(b_rate)]}

    carmoves = 0
    for direction in bikepass_regimes:
        for regime in bikepass_regimes[direction]:
            regimei,regimej = np.where(find_regime(regime,shadow) == 1)
            for i,j, in zip(regimei,regimej):
                R = Passing_regime(direction,(i,j))
                G[R.old_loc] = type2name['R']
                G[R.new_loc] = type2name['C']
                carmoves += 1

    return carmoves

Plotting functions

Below shows the code that is used to plot the system either after full evaluation or at each timestep

In [4]:
def plot_grid(G,ts,fs,es,t):
    """
    plot the system at timestep t
    
    args:
        :ts - list of timesteps
        :fs - (list of 2-tups) flow statistics per timestep
        :es - exit statistics per timestep (# of vehicles leaving the system, ratio of in/out)
    returns:
        :average car, bicycle flow rates and average exit stats
    """
    plt.close()
    fig = plt.figure(figsize=(10,15))
    gridspec.GridSpec(3,2)

    plt.subplot2grid((3,2), (0,0), colspan=2, rowspan=2)
    plt.axis('off')
    bounds = [-1,0,1,2,3,4]
    cmap = colors.ListedColormap(['tomato','honeydew','lightgrey','slateblue','gold'])
    norm = colors.BoundaryNorm(bounds,5)
    plt.imshow(G, interpolation='nearest', cmap=cmap, norm=norm)

    plt.subplot2grid((3,2), (2,0), colspan=2, rowspan=1)
    carstats = [f[0] for f in fs]
    bikstats = [f[1] for f in fs]
    avg_cfs = np.mean([cs for cs in carstats if cs >= 0])
    avg_2half_cfs = np.mean([cs for cs in carstats[len(carstats)//3:] if cs >= 0])
    avg_bfs = np.mean([bs for bs in bikstats if bs >= 0])
    avg_exs = np.mean([e for e in es if e >= 0])

    plt.plot([ts[0],ts[-1]],[avg_cfs,avg_cfs],color = 'royalblue',alpha=0.6,label='avg. prop. for cars')
    plt.plot([ts[0],ts[-1]],[avg_2half_cfs,avg_2half_cfs],color = 'darkslateblue',alpha=0.6,label='avg. prop. for cars over last third of t interval')
    plt.scatter(ts[:-1], carstats[:-1], s=50,alpha=0.8, facecolors='none', edgecolors='slateblue',label='prop. of cars moving in system at timestep t')
    plt.scatter(ts[-1], carstats[-1], s=50, facecolors='none', edgecolors='navy')
    plt.plot([ts[0],ts[-1]],[avg_bfs,avg_bfs],color = 'khaki',alpha=0.6,label='avg. prop. for bikes')
    plt.scatter(ts[:-1], bikstats[:-1], s=50,alpha=0.8, facecolors='none', edgecolors='gold',label='prop. of bikes moving in system at timestep t')
    plt.scatter(ts[-1], bikstats[-1], s=50, facecolors='none', edgecolors='darkgoldenrod')
    #plt.scatter(ts, es, s=50,alpha=0.4, facecolors='none', edgecolors='crimson',label='out/in ratio for veh in system')
    plt.plot([ts[0],ts[-1]],[avg_exs,avg_exs],color = 'crimson',alpha=0.3,label='avg. out/in ratio for veh in system')

    plt.ylim(-0.1,1.1)
    plt.xlabel('timestep, t',fontsize = 10)
    plt.ylabel('proportion of vehicles that moved at timestep, t',fontsize = 10)
    plt.legend(loc=3,fontsize = 8)
    plt.savefig(os.path.join(place,"CAgif_timestep_{:04d}".format(t)),bbox_inches='tight')

    return avg_cfs, avg_2half_cfs, avg_bfs

def plot_final_stats(ts,fs,es):
    """
    plot the bottom plot -- the flow rates and exit statistics per timestep
    """
    plt.close()
    fig = plt.figure(figsize=(15,5))

    carstats = [f[0] for f in fs]
    bikstats = [f[1] for f in fs]
    avg_cfs = np.mean([cs for cs in carstats if cs >= 0])
    avg_2half_cfs = np.mean([cs for cs in carstats[len(carstats)//3:] if cs >= 0])
    avg_bfs = np.mean([bs for bs in bikstats if bs >= 0])
    avg_exs = np.mean([e for e in es if e >= 0])

    plt.plot([ts[0],ts[-1]],[avg_cfs,avg_cfs],color = 'royalblue',alpha=0.6,label='avg. prop. for cars')
    plt.plot([ts[0],ts[-1]],[avg_2half_cfs,avg_2half_cfs],color = 'darkslateblue',alpha=0.6,label='avg. prop. for cars over last third of t interval')
    plt.scatter(ts[:-1], carstats[:-1], s=50,alpha=0.8, facecolors='none', edgecolors='slateblue',label='prop. of cars moving in system at timestep t')
    plt.scatter(ts[-1], carstats[-1], s=50, facecolors='none', edgecolors='navy')
    plt.plot([ts[0],ts[-1]],[avg_bfs,avg_bfs],color = 'khaki',alpha=0.6,label='avg. prop. for bikes')
    plt.scatter(ts[:-1], bikstats[:-1], s=50,alpha=0.8, facecolors='none', edgecolors='gold',label='prop. of bikes moving in system at timestep t')
    plt.scatter(ts[-1], bikstats[-1], s=50, facecolors='none', edgecolors='darkgoldenrod')
    #plt.scatter(ts, es, s=50,alpha=0.4, facecolors='none', edgecolors='crimson',label='out/in ratio for veh in system')
    plt.plot([ts[0],ts[-1]],[avg_exs,avg_exs],color = 'crimson',alpha=0.3,label='avg. out/in ratio for veh in system')

    plt.ylim(-0.1,1.1)
    plt.xlabel('timestep, t',fontsize = 10)
    plt.ylabel('proportion of vehicles that moved at timestep, t',fontsize = 10)
    plt.legend(loc=3,fontsize = 7)
    plt.savefig("figs/final_stats.pdf",bbox_inches='tight')

Main routine

In [6]:
# Our main arguments to change:
#                             : b_rate = time delay for bicycles
#                             : bike2car = prob. that a veh entering
#                                          the system is a bicycle
#                             : time = total time to run sys for
#                             : flow rates

b_rate = 3
for i in range(b_rate):
    type2name['B'+str(i)] = 3+i

coin_flip = lambda p: random.random() < p
place = "figs/traffic_gif/"

os.system("rm {}".format(os.path.join(place,"*")))
plt.close()
bike2car  = 0.2  # proportion of vehicles that enter the system that are bikes
dim = 100        # dimensions of the system/grid
time = 100      # total time system runs for
system = np.zeros((dim,dim))

# flow_rates: (dict) - probability of vehicles entering the system
#                     from N,S,E, or W directions @ time t
allflowseq = True
if allflowseq:
    flowrate = 0.4
    flows = {d:flowrate for d in set("NSEW")}
else:
    flows = {"N":0.2,
             "S":0.1,
             "E":0.05,
             "W":0.15}

# incoming traffic zones:|
N_traffic = [dim//4,dim//2,dim - dim//4]
W_traffic = [dim//4,dim//2,dim - dim//4]

# laying roads
traffic_in    = lay_roads(N_traffic,W_traffic,system)
intersections, system = place_intersections(system)

flowstats = [] # number of vehicles that move per time step / total vehicles in system
exitflows = []
timesteps = []

for t in range(time):
    shadow = system.copy()
    carmove = 0
    bikemove = 0
    # vehicles entering the system
    veh_in = 0
    veh_out = 0
    for direction in flows:
        for t_in in traffic_in[direction]:
            if shadow[t_in] == type2name["R"] and coin_flip(flows[direction]):
                if coin_flip(bike2car):
                    for i in range(b_rate):
                        if (t+i) % b_rate == 0:
                            system[t_in] = type2name["B"+str(i)]   # a bike appears
                else:
                    system[t_in] = type2name["C"]   # a car appears
                veh_in +=1

    # do general, nonintersection related movement
    for i in range(b_rate):
        if (t+i) % b_rate == 0:
            # find and move bikes
            for bi, bj in zip(*np.where(shadow == type2name["B"+str(i)])):
                mv = move_vehicle(type2name,"B"+str(i),bi, bj, shadow, system)
                if mv:
                    if mv['veh_move']:
                        bikemove +=1
                        if mv['veh_exit']:
                            veh_out +=1

    for ci, cj in zip(*np.where(shadow == type2name["C"])):
        mv = move_vehicle(type2name,"C",ci,cj,shadow,system)
        if mv:
            if mv['veh_move']:
                carmove +=1
                if mv['veh_exit']:
                    veh_out +=1

    # look at intersections, move appropriately
    # who's at an intersection?
    for I in intersections:
        for direction in I.nsew.keys():
            if shadow[I.nsew[direction]] > 1 and direction not in I.queue:
                I.queue.append(direction)

        if I.queue:
            dmover = I.queue.pop(0)
            i,j = I.nsew[dmover]
            next_coord = random.choice(I.potential_locs[dmover])
            if shadow[next_coord] == type2name["R"]:
                system[next_coord] = shadow[i,j]
                system[i,j] = type2name["R"]
                if shadow[i,j] == 'C':
                    carmove +=1
                elif shadow[i,j] =='B':
                    bikemove +=1

    carmove += cars_pass_bikes(type2name,b_rate,shadow,system)

    # total number of C and B at start of timestep:
    uni, cts = np.unique(shadow, return_counts = True)
    cnts = dict(zip(uni,cts))
    totcar = cnts.get(type2name['C'],0)
    totbike = 0
    totbikecanmove = 0
    Bs = [c for c in cnts if c > type2name['C']]
    for B in Bs:
        totbike += cnts[B]
    for i in range(b_rate):
        if (t+i) % b_rate == 0 and type2name['B{}'.format(str(i))] in cnts:
            totbikecanmove = cnts[type2name['B'+str(i)]]

    if totcar == 0:
        carstat = -1
    else:
        carstat = carmove/totcar #+ 0.01

    if veh_in == 0:
        existat = -1
    else:
        existat = veh_out/veh_in

    if totbike == 0:
        bikestat1 = -1
    else:
        bikestat1 = bikemove/totbike #- 0.01

    if totbikecanmove == 0:
        bikestat2 = -1
    else:
        bikestat2 = bikemove/totbikecanmove

    os.system('scp figs/final_stats.pdf figs/final_stats'+path+'.pdf')
    flowstats.append((carstat,bikestat1,bikestat2))
    exitflows.append(existat)
    timesteps.append(t)

    avg_cfs, avg_2half_cfs , avg_bfs= plot_grid(system,timesteps,flowstats,exitflows,t)

    if t == time - 1:
        plot_final_stats(timesteps,flowstats,exitflows)


# Get all gif image files
imgfiles = sorted(glob(place+'*'))

# Make GIF!!!!   Naming convention:
#                                t: number of timesteps
#                               br: bikerate (bike speed delay)
#                              b2c: proportion of bikes in the system
#                            flows: single flow rate if uniform from every direction
#                                   or flow for each direction
#                   avg cflow stat: average proportion of cars moving per time step over entire time interval
#                                   average proportion of cars moving per time step over last 1/3 of time interval
#                   avg bflow stat: average proportion of bikes moving per time step

if allflowseq:
    path = '_t-{}_br-{}_b2c-{}_flows-{}_avgcf-{}-{}_avgbf-{}'.format(time,b_rate,round(bike2car,2),round(flows['N'],2),round(avg_cfs,3),round(avg_2half_cfs,3),round(avg_bfs,3))
else:
    path = '_t-{}_br-{}_b2c-{}_flowN-{}-S-{}-E-{}-W-{}_avgcf-{}-{}_avgbf-{}'.format(time,b_rate,round(bike2car,2),round(flows['N'],2),round(flows['S'],2),round(flows['E'],2),round(flows['W'],2),round(avg_cfs,3),round(avg_2half_cfs,3),round(avg_bfs,3))
with imageio.get_writer('figs/traffic_gif'+path+'.gif', mode='I') as writer:
    for filename in imgfiles:
        image = imageio.imread(filename)
        writer.append_data(image)

os.system('scp figs/final_stats.pdf figs/final_stats'+path+'.pdf')
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-6-8126f8d51af5> in <module>()
     46 
     47 # laying roads
---> 48 traffic_in    = lay_roads(N_traffic,W_traffic,system)
     49 intersections, system = place_intersections(system)
     50 

NameError: name 'lay_roads' is not defined