from concurrent.futures import ThreadPoolExecutor
import csv
import datetime
import os
import sqlite3
import requests
bearer = os.environ["SAMSARA_API_TOKEN"]
VEHICLE_STATS_API = "https://api.samsara.com/fleet/vehicles/stats/history"
ASSIGNMENTS_API = "https://api.samsara.com/beta/fleet/driver-vehicle-assignments"
THREADS = 3
def get_vehicle_stats_history(stat_types, start_time, end_time, cursor=None):
headers = {
"Authorization": "Bearer " + bearer,
"X-Samsara-Hidden": "true",
}
params = {
"types": ",".join(stat_types),
"startTime": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"endTime": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
}
if cursor:
params["after"] = cursor
return requests.get(url=VEHICLE_STATS_API, timeout=10, headers=headers, params=params)
def get_vehicle_assignments(start_time, end_time, cursor=None):
headers = {
"Authorization": "Bearer " + bearer,
"X-Samsara-Hidden": "true",
}
params = {
"startTime": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"endTime": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
}
if cursor:
params["after"] = cursor
return requests.get(url=ASSIGNMENTS_API, timeout=10, headers=headers, params=params)
def split_date_range(start_time, end_time, num_intervals):
diff = (end_time - start_time) / num_intervals
start_interval = start_time
end_interval = start_time + diff
for _ in range(num_intervals):
yield (start_interval, end_interval)
start_interval = end_interval
end_interval = end_interval + diff
def parse_and_write_vehicle_stats_history_response(resp, types):
con = sqlite3.connect("stats.db")
cur = con.cursor()
for vehicle in resp["data"]:
vehicle_id = vehicle["id"]
for stat_type in types:
if stat_type != "gps":
continue
for data_point in vehicle["gps"]:
cur.execute(
"INSERT INTO data VALUES (?, ?, ?, ?)",
(
vehicle_id,
data_point["time"],
data_point["latitude"],
data_point["longitude"],
),
)
con.commit()
con.close()
def get_vehicle_stats_history_parallelized(stat_types, start_time, end_time):
print(f"Starting stats replication. Types: {stat_types}, Start: {start_time}, End: {end_time}")
timerange_generator = split_date_range(start_time, end_time, THREADS)
with ThreadPoolExecutor(max_workers=THREADS) as executor:
futures_and_args = []
for timerange in timerange_generator:
future = executor.submit(get_vehicle_stats_history, stat_types, timerange[0], timerange[1])
futures_and_args.append((future, stat_types, timerange))
while futures_and_args:
future, future_stat_types, timerange = futures_and_args.pop(0)
resp = future.result().json()
parse_and_write_vehicle_stats_history_response(resp, future_stat_types)
print("Completed: " + str(timerange))
end_cursor = resp["pagination"]["endCursor"]
if end_cursor:
future = executor.submit(
get_vehicle_stats_history,
future_stat_types,
timerange[0],
timerange[1],
end_cursor,
)
futures_and_args.append((future, future_stat_types, timerange))
def parse_and_write_vehicle_assignments_response(resp):
con = sqlite3.connect("stats.db")
cur = con.cursor()
for assignment in resp["data"]:
cur.execute(
"INSERT INTO assignments VALUES (?, ?, ?, ?)",
(
assignment["vehicle"]["id"],
assignment["startTime"],
assignment["endTime"],
assignment["driver"]["id"],
),
)
con.commit()
con.close()
def get_vehicle_assignments_parallelized(start_time, end_time):
print(f"Starting assignment replication. Start: {start_time}, End: {end_time}")
timerange_generator = split_date_range(start_time, end_time, THREADS)
with ThreadPoolExecutor(max_workers=THREADS) as executor:
futures_and_args = []
for timerange in timerange_generator:
future = executor.submit(get_vehicle_assignments, timerange[0], timerange[1])
futures_and_args.append((future, timerange))
while futures_and_args:
future, timerange = futures_and_args.pop(0)
resp = future.result().json()
parse_and_write_vehicle_assignments_response(resp)
print("Completed: " + str(timerange))
end_cursor = resp["pagination"]["endCursor"]
if end_cursor:
future = executor.submit(get_vehicle_assignments, timerange[0], timerange[1], end_cursor)
futures_and_args.append((future, timerange))
def write_data_to_csv(resp):
flattened_data = []
for vehicle in resp["data"]:
for data_point in vehicle["gps"]:
flattened_data.append([
data_point["time"],
vehicle["id"],
data_point["latitude"],
data_point["longitude"],
])
flattened_data = sorted(flattened_data, key=lambda x: x[0])
with open("data.csv", "w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["Time", "Vehicle ID", "Latitude", "Longitude"])
writer.writerows(flattened_data)
if __name__ == "__main__":
con = sqlite3.connect("stats.db")
cur = con.cursor()
cur.execute("DROP TABLE IF EXISTS data")
cur.execute("DROP TABLE IF EXISTS assignments")
cur.execute(
"CREATE TABLE data (vehicle_id text, time datetime, latitude real, longitude real)"
)
cur.execute(
"CREATE TABLE assignments (vehicle_id text, start_time datetime, end_time datetime, driver_id text)"
)
con.commit()
con.close()
start_time = datetime.datetime(2022, 1, 1, 1, 0, 0)
end_time = datetime.datetime(2022, 1, 1, 1, 30, 0)
stat_types = ["gps"]
get_vehicle_stats_history_parallelized(stat_types, start_time, end_time)
get_vehicle_assignments_parallelized(start_time, end_time)