123 lines
4.5 KiB
Python
123 lines
4.5 KiB
Python
import os
|
|
from dotenv import load_dotenv
|
|
import requests
|
|
import datetime
|
|
|
|
load_dotenv()
|
|
|
|
OKAPI_POSTE_KEY = os.getenv('OKAPI_POSTE_KEY')
|
|
DHL_KEY = os.getenv('DHL_KEY')
|
|
DHL_SECRET = os.getenv('DHL_SECRET')
|
|
|
|
POSTE_ENDPOINT = 'https://api.laposte.fr/suivi/v2/idships/'
|
|
DHL_ENDPOINT = 'https://api-eu.dhl.com/track/shipments'
|
|
|
|
|
|
class Event:
|
|
def __init__(self, date, label, description="", location=""):
|
|
self.date = date
|
|
self.label = label
|
|
self.description = description
|
|
self.location = location
|
|
|
|
def __lt__(self, other):
|
|
return self.date.__lt__(other.date)
|
|
|
|
def __str__(self):
|
|
return "Event<date:" + str(self.date) + "; label:" + self.label + "; desciption:" + self.description + "; location:" + self.location + ">"
|
|
|
|
def __repr__(self):
|
|
return self.__str__()
|
|
|
|
class Tracking:
|
|
|
|
def __init__(self, tracking_num):
|
|
self.tracking_num = tracking_num
|
|
self.events = []
|
|
self.service = ""
|
|
self.eta = None
|
|
return
|
|
|
|
def track(self):
|
|
pass
|
|
|
|
def get_history(self):
|
|
self.events.sort()
|
|
return self.events
|
|
|
|
def get_last_event(self):
|
|
self.events.sort()
|
|
if len(self.events) == 0:
|
|
return None
|
|
return self.events[-1]
|
|
|
|
class Poste_tracking(Tracking):
|
|
codes = {"DR1":"Déclaratif réceptionné",
|
|
"DR2":"Problème lors de la préparation",
|
|
"PC1":"Pris en charge",
|
|
"PC2":"Pris en charge dans le pays d'expédition",
|
|
"ET1":"En cours de traitement",
|
|
"ET2":"En cours de traitement dans le pays d'expédition",
|
|
"ET3":"En cours de traitement dans le pays de destination",
|
|
"ET4":"En cours de traitement dans un pays de transit",
|
|
"EP1":"En attente de présentation",
|
|
"DO1":"Entrée en Douane",
|
|
"DO2":"Sortie de Douane",
|
|
"DO3":"Retenu en Douane",
|
|
"PB1":"Problème en cours",
|
|
"PB2":"Problème résolu",
|
|
"MD2":"Mis en distribution",
|
|
"ND1":"Non distribuable",
|
|
"AG1":"En attente d'être retiré au guichet",
|
|
"RE1":"Retourné à l'expéditeur",
|
|
"DI0":"Distribué en lot",
|
|
"DI1":"Distribué",
|
|
"DI2":"Distribué à l'expéditeur",
|
|
"DI3":"Retardé"}
|
|
def track(self):
|
|
auth_headers = {'X-Okapi-Key' : OKAPI_POSTE_KEY, 'Accept' : 'application/json'}
|
|
response = requests.get(POSTE_ENDPOINT + self.tracking_num, headers = auth_headers)
|
|
js = response.json()
|
|
if "product" in js:
|
|
self.service = js["product"]
|
|
if "shipment" in js:
|
|
for jev in js["shipment"]["event"]:
|
|
self.events.append(Event(datetime.datetime.fromisoformat(jev["date"]), self.codes[jev["code"]],description=jev["label"]))
|
|
if "estimDate" in js["shipment"]:
|
|
self.eta = datetime.datetime.fromisoformat(js["shipment"]["estimDate"])
|
|
|
|
class DHL_tracking(Tracking):
|
|
def track(self):
|
|
auth_headers = {'DHL-API-Key' : DHL_KEY}
|
|
query = {'trackingNumber' : self.tracking_num}
|
|
response = requests.get(DHL_ENDPOINT, params = query, headers = auth_headers)
|
|
js = response.json()
|
|
if "shipments" in js:
|
|
if len(js["shipments"]) == 1:
|
|
self.service = js["shipments"][0]["service"]
|
|
if "estimatedDeliveryTimeFrame" in js["shipments"][0]:
|
|
self.eta = datetime.datetime.fromisoformat(js["shipments"][0]["estimatedDeliveryTimeFrame"]["estimatedFrom"])
|
|
if "estimatedTimeOfDelivery" in js["shipments"][0]:
|
|
self.eta = datetime.datetime.fromisoformat(js["shipments"][0]["estimatedTimeOfDelivery"])
|
|
for jev in js["shipments"][0]["events"]:
|
|
loc = ""
|
|
if "location" in jev:
|
|
loc=jev["location"]["address"]["addressLocality"]
|
|
self.events.append(Event(datetime.datetime.fromisoformat(jev["timestamp"]), jev["statusCode"],description=jev["description"], location=loc))
|
|
|
|
#print('[')
|
|
Ptrack = Poste_tracking("CC550445947DE")
|
|
Ptrack.track()
|
|
#print(Ptrack.get_last_event())
|
|
#print(',')
|
|
|
|
#Chronotrack = Poste_tracking("XA420959322FR")
|
|
#Chronotrack.track()
|
|
#print(Chronotrack.get_last_event())
|
|
#print(',')
|
|
|
|
DHLtrack = DHL_tracking("CC550445947DE")
|
|
DHLtrack.track()
|
|
#print(DHLtrack.get_history())
|
|
#print(Ptrack.get_history())
|
|
#print(']') |