Skip to main content

Documentation Index

Fetch the complete documentation index at: https://samsara-showcase.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

This recipe parallelizes historical API calls by time range, writes GPS data and assignment data to SQLite, and shows how to correlate the two datasets.
from concurrent.futures import ThreadPoolExecutor
import csv
import datetime
import os
import sqlite3

import requests

bearer = os.environ["SAMSARA_API_TOKEN"]
VEHICLE_STATS_API = "https://api.samsara.com/fleet/vehicles/stats/history"
ASSIGNMENTS_API = "https://api.samsara.com/beta/fleet/driver-vehicle-assignments"
THREADS = 3


def get_vehicle_stats_history(stat_types, start_time, end_time, cursor=None):
    headers = {
        "Authorization": "Bearer " + bearer,
        "X-Samsara-Hidden": "true",
    }
    params = {
        "types": ",".join(stat_types),
        "startTime": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "endTime": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
    }
    if cursor:
        params["after"] = cursor

    return requests.get(url=VEHICLE_STATS_API, timeout=10, headers=headers, params=params)


def get_vehicle_assignments(start_time, end_time, cursor=None):
    headers = {
        "Authorization": "Bearer " + bearer,
        "X-Samsara-Hidden": "true",
    }
    params = {
        "startTime": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "endTime": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
    }
    if cursor:
        params["after"] = cursor

    return requests.get(url=ASSIGNMENTS_API, timeout=10, headers=headers, params=params)


def split_date_range(start_time, end_time, num_intervals):
    diff = (end_time - start_time) / num_intervals
    start_interval = start_time
    end_interval = start_time + diff

    for _ in range(num_intervals):
        yield (start_interval, end_interval)
        start_interval = end_interval
        end_interval = end_interval + diff


def parse_and_write_vehicle_stats_history_response(resp, types):
    con = sqlite3.connect("stats.db")
    cur = con.cursor()

    for vehicle in resp["data"]:
        vehicle_id = vehicle["id"]

        for stat_type in types:
            if stat_type != "gps":
                continue

            for data_point in vehicle["gps"]:
                cur.execute(
                    "INSERT INTO data VALUES (?, ?, ?, ?)",
                    (
                        vehicle_id,
                        data_point["time"],
                        data_point["latitude"],
                        data_point["longitude"],
                    ),
                )

    con.commit()
    con.close()


def get_vehicle_stats_history_parallelized(stat_types, start_time, end_time):
    print(f"Starting stats replication. Types: {stat_types}, Start: {start_time}, End: {end_time}")
    timerange_generator = split_date_range(start_time, end_time, THREADS)

    with ThreadPoolExecutor(max_workers=THREADS) as executor:
        futures_and_args = []

        for timerange in timerange_generator:
            future = executor.submit(get_vehicle_stats_history, stat_types, timerange[0], timerange[1])
            futures_and_args.append((future, stat_types, timerange))

        while futures_and_args:
            future, future_stat_types, timerange = futures_and_args.pop(0)
            resp = future.result().json()
            parse_and_write_vehicle_stats_history_response(resp, future_stat_types)
            print("Completed: " + str(timerange))

            end_cursor = resp["pagination"]["endCursor"]
            if end_cursor:
                future = executor.submit(
                    get_vehicle_stats_history,
                    future_stat_types,
                    timerange[0],
                    timerange[1],
                    end_cursor,
                )
                futures_and_args.append((future, future_stat_types, timerange))


def parse_and_write_vehicle_assignments_response(resp):
    con = sqlite3.connect("stats.db")
    cur = con.cursor()

    for assignment in resp["data"]:
        cur.execute(
            "INSERT INTO assignments VALUES (?, ?, ?, ?)",
            (
                assignment["vehicle"]["id"],
                assignment["startTime"],
                assignment["endTime"],
                assignment["driver"]["id"],
            ),
        )

    con.commit()
    con.close()


def get_vehicle_assignments_parallelized(start_time, end_time):
    print(f"Starting assignment replication. Start: {start_time}, End: {end_time}")
    timerange_generator = split_date_range(start_time, end_time, THREADS)

    with ThreadPoolExecutor(max_workers=THREADS) as executor:
        futures_and_args = []

        for timerange in timerange_generator:
            future = executor.submit(get_vehicle_assignments, timerange[0], timerange[1])
            futures_and_args.append((future, timerange))

        while futures_and_args:
            future, timerange = futures_and_args.pop(0)
            resp = future.result().json()
            parse_and_write_vehicle_assignments_response(resp)
            print("Completed: " + str(timerange))

            end_cursor = resp["pagination"]["endCursor"]
            if end_cursor:
                future = executor.submit(get_vehicle_assignments, timerange[0], timerange[1], end_cursor)
                futures_and_args.append((future, timerange))


def write_data_to_csv(resp):
    flattened_data = []

    for vehicle in resp["data"]:
        for data_point in vehicle["gps"]:
            flattened_data.append([
                data_point["time"],
                vehicle["id"],
                data_point["latitude"],
                data_point["longitude"],
            ])

    flattened_data = sorted(flattened_data, key=lambda x: x[0])

    with open("data.csv", "w", newline="") as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["Time", "Vehicle ID", "Latitude", "Longitude"])
        writer.writerows(flattened_data)


if __name__ == "__main__":
    con = sqlite3.connect("stats.db")
    cur = con.cursor()
    cur.execute("DROP TABLE IF EXISTS data")
    cur.execute("DROP TABLE IF EXISTS assignments")
    cur.execute(
        "CREATE TABLE data (vehicle_id text, time datetime, latitude real, longitude real)"
    )
    cur.execute(
        "CREATE TABLE assignments (vehicle_id text, start_time datetime, end_time datetime, driver_id text)"
    )
    con.commit()
    con.close()

    start_time = datetime.datetime(2022, 1, 1, 1, 0, 0)
    end_time = datetime.datetime(2022, 1, 1, 1, 30, 0)
    stat_types = ["gps"]

    get_vehicle_stats_history_parallelized(stat_types, start_time, end_time)
    get_vehicle_assignments_parallelized(start_time, end_time)

Correlate replicated data

After both tables are populated, you can correlate GPS points with the driver assigned to the vehicle at that point in time:
SELECT *
FROM data
LEFT JOIN assignments
  ON data.vehicle_id = assignments.vehicle_id
  AND data.time >= assignments.start_time
  AND data.time < assignments.end_time;

How it works

1

Install dependencies

Run pip install requests to install the requests library.
2

Wrap API calls

The script defines helpers for GET /fleet/vehicles/stats/history and GET /fleet/driver-vehicle-assignments.
3

Split the time range

The requested time range is split into intervals so calls can run in parallel.
4

Write API responses to SQLite

GPS points are inserted into the data table, and driver-vehicle assignments are inserted into the assignments table.
5

Parallelize calls

A thread pool starts one worker per time interval. Workers continue through pagination for their assigned interval.
6

Optionally write CSV

The write_data_to_csv helper flattens vehicle stat responses into a CSV shape.
7

Correlate assignments

Join GPS rows to assignment rows by matching vehicle IDs and overlapping timestamps.
See Historical Telematics and Driver-Vehicle Assignments for related API details.