1
0
Fork 0
mirror of https://github.com/seanmorley15/AdventureLog.git synced 2025-08-05 13:15:18 +02:00
AdventureLog/backend/server/worldtravel/management/commands/download-countries.py

517 lines
No EOL
22 KiB
Python

import os
from django.core.management.base import BaseCommand
import requests
from worldtravel.models import Country, Region, City
from django.db import transaction
import ijson
import gc
import tempfile
import sqlite3
from contextlib import contextmanager
from django.conf import settings
COUNTRY_REGION_JSON_VERSION = settings.COUNTRY_REGION_JSON_VERSION
media_root = settings.MEDIA_ROOT
def saveCountryFlag(country_code):
# For standards, use the lowercase country_code
country_code = country_code.lower()
flags_dir = os.path.join(media_root, 'flags')
# Check if the flags directory exists, if not, create it
if not os.path.exists(flags_dir):
os.makedirs(flags_dir)
# Check if the flag already exists in the media folder
flag_path = os.path.join(flags_dir, f'{country_code}.png')
if os.path.exists(flag_path):
print(f'Flag for {country_code} already exists')
return
res = requests.get(f'https://flagcdn.com/h240/{country_code}.png'.lower())
if res.status_code == 200:
with open(flag_path, 'wb') as f:
f.write(res.content)
print(f'Flag for {country_code} downloaded')
else:
print(f'Error downloading flag for {country_code}')
class Command(BaseCommand):
help = 'Imports the world travel data with minimal memory usage'
def add_arguments(self, parser):
parser.add_argument('--force', action='store_true', help='Force download the countries+regions+states.json file')
parser.add_argument('--batch-size', type=int, default=500, help='Batch size for database operations')
@contextmanager
def _temp_db(self):
"""Create a temporary SQLite database for intermediate storage"""
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
temp_db_path = f.name
try:
conn = sqlite3.connect(temp_db_path)
conn.execute('''CREATE TABLE temp_countries (
country_code TEXT PRIMARY KEY,
name TEXT,
subregion TEXT,
capital TEXT,
longitude REAL,
latitude REAL
)''')
conn.execute('''CREATE TABLE temp_regions (
id TEXT PRIMARY KEY,
name TEXT,
country_code TEXT,
longitude REAL,
latitude REAL
)''')
conn.execute('''CREATE TABLE temp_cities (
id TEXT PRIMARY KEY,
name TEXT,
region_id TEXT,
longitude REAL,
latitude REAL
)''')
conn.commit()
yield conn
finally:
conn.close()
try:
os.unlink(temp_db_path)
except OSError:
pass
def handle(self, **options):
force = options['force']
batch_size = options['batch_size']
countries_json_path = os.path.join(settings.MEDIA_ROOT, f'countries+regions+states-{COUNTRY_REGION_JSON_VERSION}.json')
# Download or validate JSON file
if not os.path.exists(countries_json_path) or force:
self.stdout.write('Downloading JSON file...')
res = requests.get(f'https://raw.githubusercontent.com/dr5hn/countries-states-cities-database/{COUNTRY_REGION_JSON_VERSION}/json/countries%2Bstates%2Bcities.json')
if res.status_code == 200:
with open(countries_json_path, 'w') as f:
f.write(res.text)
self.stdout.write(self.style.SUCCESS('JSON file downloaded successfully'))
else:
self.stdout.write(self.style.ERROR('Error downloading JSON file'))
return
elif not os.path.isfile(countries_json_path):
self.stdout.write(self.style.ERROR('JSON file is not a file'))
return
elif os.path.getsize(countries_json_path) == 0:
self.stdout.write(self.style.ERROR('JSON file is empty'))
return
elif Country.objects.count() == 0 or Region.objects.count() == 0 or City.objects.count() == 0:
self.stdout.write(self.style.WARNING('Some data is missing. Re-importing all data.'))
else:
self.stdout.write(self.style.SUCCESS('Latest data already imported.'))
return
self.stdout.write(self.style.SUCCESS('Starting ultra-memory-efficient import process...'))
# Use temporary SQLite database for intermediate storage
with self._temp_db() as temp_conn:
self.stdout.write('Step 1: Parsing JSON and storing in temporary database...')
self._parse_and_store_temp(countries_json_path, temp_conn)
self.stdout.write('Step 2: Processing countries...')
self._process_countries_from_temp(temp_conn, batch_size)
self.stdout.write('Step 3: Processing regions...')
self._process_regions_from_temp(temp_conn, batch_size)
self.stdout.write('Step 4: Processing cities...')
self._process_cities_from_temp(temp_conn, batch_size)
self.stdout.write('Step 5: Cleaning up obsolete records...')
self._cleanup_obsolete_records(temp_conn)
self.stdout.write(self.style.SUCCESS('All data imported successfully with minimal memory usage'))
def _parse_and_store_temp(self, json_path, temp_conn):
"""Parse JSON once and store in temporary SQLite database"""
country_count = 0
region_count = 0
city_count = 0
with open(json_path, 'rb') as f:
parser = ijson.items(f, 'item')
for country in parser:
country_code = country['iso2']
country_name = country['name']
country_subregion = country['subregion']
country_capital = country['capital']
longitude = round(float(country['longitude']), 6) if country['longitude'] else None
latitude = round(float(country['latitude']), 6) if country['latitude'] else None
# Store country
temp_conn.execute('''INSERT OR REPLACE INTO temp_countries
(country_code, name, subregion, capital, longitude, latitude)
VALUES (?, ?, ?, ?, ?, ?)''',
(country_code, country_name, country_subregion, country_capital, longitude, latitude))
country_count += 1
# Download flag (do this during parsing to avoid extra pass)
saveCountryFlag(country_code)
# Process regions/states
if country['states']:
for state in country['states']:
state_id = f"{country_code}-{state['state_code']}"
state_name = state['name']
state_lat = round(float(state['latitude']), 6) if state['latitude'] else None
state_lng = round(float(state['longitude']), 6) if state['longitude'] else None
temp_conn.execute('''INSERT OR REPLACE INTO temp_regions
(id, name, country_code, longitude, latitude)
VALUES (?, ?, ?, ?, ?)''',
(state_id, state_name, country_code, state_lng, state_lat))
region_count += 1
# Process cities
if 'cities' in state and state['cities']:
for city in state['cities']:
city_id = f"{state_id}-{city['id']}"
city_name = city['name']
city_lat = round(float(city['latitude']), 6) if city['latitude'] else None
city_lng = round(float(city['longitude']), 6) if city['longitude'] else None
temp_conn.execute('''INSERT OR REPLACE INTO temp_cities
(id, name, region_id, longitude, latitude)
VALUES (?, ?, ?, ?, ?)''',
(city_id, city_name, state_id, city_lng, city_lat))
city_count += 1
else:
# Country without states - create default region
state_id = f"{country_code}-00"
temp_conn.execute('''INSERT OR REPLACE INTO temp_regions
(id, name, country_code, longitude, latitude)
VALUES (?, ?, ?, ?, ?)''',
(state_id, country_name, country_code, None, None))
region_count += 1
# Commit periodically to avoid memory buildup
if country_count % 100 == 0:
temp_conn.commit()
self.stdout.write(f' Parsed {country_count} countries, {region_count} regions, {city_count} cities...')
temp_conn.commit()
self.stdout.write(f'✓ Parsing complete: {country_count} countries, {region_count} regions, {city_count} cities')
def _process_countries_from_temp(self, temp_conn, batch_size):
"""Process countries from temporary database"""
cursor = temp_conn.execute('SELECT country_code, name, subregion, capital, longitude, latitude FROM temp_countries')
countries_to_create = []
countries_to_update = []
processed = 0
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
# Batch check for existing countries
country_codes_in_batch = [row[0] for row in rows]
existing_countries = {
c.country_code: c for c in
Country.objects.filter(country_code__in=country_codes_in_batch)
.only('country_code', 'name', 'subregion', 'capital', 'longitude', 'latitude')
}
for row in rows:
country_code, name, subregion, capital, longitude, latitude = row
if country_code in existing_countries:
# Update existing
country_obj = existing_countries[country_code]
country_obj.name = name
country_obj.subregion = subregion
country_obj.capital = capital
country_obj.longitude = longitude
country_obj.latitude = latitude
countries_to_update.append(country_obj)
else:
countries_to_create.append(Country(
country_code=country_code,
name=name,
subregion=subregion,
capital=capital,
longitude=longitude,
latitude=latitude
))
processed += 1
# Flush batches
if countries_to_create:
with transaction.atomic():
Country.objects.bulk_create(countries_to_create, batch_size=batch_size, ignore_conflicts=True)
countries_to_create.clear()
if countries_to_update:
with transaction.atomic():
Country.objects.bulk_update(
countries_to_update,
['name', 'subregion', 'capital', 'longitude', 'latitude'],
batch_size=batch_size
)
countries_to_update.clear()
if processed % 1000 == 0:
self.stdout.write(f' Processed {processed} countries...')
gc.collect()
# Final flush
if countries_to_create:
with transaction.atomic():
Country.objects.bulk_create(countries_to_create, batch_size=batch_size, ignore_conflicts=True)
if countries_to_update:
with transaction.atomic():
Country.objects.bulk_update(
countries_to_update,
['name', 'subregion', 'capital', 'longitude', 'latitude'],
batch_size=batch_size
)
self.stdout.write(f'✓ Countries complete: {processed} processed')
def _process_regions_from_temp(self, temp_conn, batch_size):
"""Process regions from temporary database"""
# Get country mapping once
country_map = {c.country_code: c for c in Country.objects.only('id', 'country_code')}
cursor = temp_conn.execute('SELECT id, name, country_code, longitude, latitude FROM temp_regions')
regions_to_create = []
regions_to_update = []
processed = 0
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
# Batch check for existing regions
region_ids_in_batch = [row[0] for row in rows]
existing_regions = {
r.id: r for r in
Region.objects.filter(id__in=region_ids_in_batch)
.select_related('country')
.only('id', 'name', 'country', 'longitude', 'latitude')
}
for row in rows:
region_id, name, country_code, longitude, latitude = row
country_obj = country_map.get(country_code)
if not country_obj:
continue
if region_id in existing_regions:
# Update existing
region_obj = existing_regions[region_id]
region_obj.name = name
region_obj.country = country_obj
region_obj.longitude = longitude
region_obj.latitude = latitude
regions_to_update.append(region_obj)
else:
regions_to_create.append(Region(
id=region_id,
name=name,
country=country_obj,
longitude=longitude,
latitude=latitude
))
processed += 1
# Flush batches
if regions_to_create:
with transaction.atomic():
Region.objects.bulk_create(regions_to_create, batch_size=batch_size, ignore_conflicts=True)
regions_to_create.clear()
if regions_to_update:
with transaction.atomic():
Region.objects.bulk_update(
regions_to_update,
['name', 'country', 'longitude', 'latitude'],
batch_size=batch_size
)
regions_to_update.clear()
if processed % 2000 == 0:
self.stdout.write(f' Processed {processed} regions...')
gc.collect()
# Final flush
if regions_to_create:
with transaction.atomic():
Region.objects.bulk_create(regions_to_create, batch_size=batch_size, ignore_conflicts=True)
if regions_to_update:
with transaction.atomic():
Region.objects.bulk_update(
regions_to_update,
['name', 'country', 'longitude', 'latitude'],
batch_size=batch_size
)
self.stdout.write(f'✓ Regions complete: {processed} processed')
def _process_cities_from_temp(self, temp_conn, batch_size):
"""Process cities from temporary database with optimized existence checking"""
# Get region mapping once
region_map = {r.id: r for r in Region.objects.only('id')}
cursor = temp_conn.execute('SELECT id, name, region_id, longitude, latitude FROM temp_cities')
cities_to_create = []
cities_to_update = []
processed = 0
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
# Fast existence check - only get IDs, no objects
city_ids_in_batch = [row[0] for row in rows]
existing_city_ids = set(
City.objects.filter(id__in=city_ids_in_batch)
.values_list('id', flat=True)
)
for row in rows:
city_id, name, region_id, longitude, latitude = row
region_obj = region_map.get(region_id)
if not region_obj:
continue
if city_id in existing_city_ids:
# For updates, just store the data - we'll do bulk update by raw SQL
cities_to_update.append({
'id': city_id,
'name': name,
'region_id': region_obj.id,
'longitude': longitude,
'latitude': latitude
})
else:
cities_to_create.append(City(
id=city_id,
name=name,
region=region_obj,
longitude=longitude,
latitude=latitude
))
processed += 1
# Flush create batch (this is already fast)
if cities_to_create:
with transaction.atomic():
City.objects.bulk_create(cities_to_create, batch_size=batch_size, ignore_conflicts=True)
cities_to_create.clear()
# Flush update batch with raw SQL for speed
if cities_to_update:
self._bulk_update_cities_raw(cities_to_update)
cities_to_update.clear()
if processed % 5000 == 0:
self.stdout.write(f' Processed {processed} cities...')
gc.collect()
# Final flush
if cities_to_create:
with transaction.atomic():
City.objects.bulk_create(cities_to_create, batch_size=batch_size, ignore_conflicts=True)
if cities_to_update:
self._bulk_update_cities_raw(cities_to_update)
self.stdout.write(f'✓ Cities complete: {processed} processed')
def _bulk_update_cities_raw(self, cities_data):
"""Fast bulk update using raw SQL"""
if not cities_data:
return
from django.db import connection
with connection.cursor() as cursor:
# Build the SQL for bulk update
# Using CASE statements for efficient bulk updates
when_clauses_name = []
when_clauses_region = []
when_clauses_lng = []
when_clauses_lat = []
city_ids = []
for city in cities_data:
city_id = city['id']
city_ids.append(city_id)
when_clauses_name.append(f"WHEN id = %s THEN %s")
when_clauses_region.append(f"WHEN id = %s THEN %s")
when_clauses_lng.append(f"WHEN id = %s THEN %s")
when_clauses_lat.append(f"WHEN id = %s THEN %s")
# Build parameters list
params = []
for city in cities_data:
params.extend([city['id'], city['name']]) # for name
for city in cities_data:
params.extend([city['id'], city['region_id']]) # for region_id
for city in cities_data:
params.extend([city['id'], city['longitude']]) # for longitude
for city in cities_data:
params.extend([city['id'], city['latitude']]) # for latitude
params.extend(city_ids) # for WHERE clause
# Execute the bulk update
sql = f"""
UPDATE worldtravel_city
SET
name = CASE {' '.join(when_clauses_name)} END,
region_id = CASE {' '.join(when_clauses_region)} END,
longitude = CASE {' '.join(when_clauses_lng)} END,
latitude = CASE {' '.join(when_clauses_lat)} END
WHERE id IN ({','.join(['%s'] * len(city_ids))})
"""
cursor.execute(sql, params)
def _cleanup_obsolete_records(self, temp_conn):
"""Clean up obsolete records using temporary database"""
# Get IDs from temp database to avoid loading large lists into memory
temp_country_codes = {row[0] for row in temp_conn.execute('SELECT country_code FROM temp_countries')}
temp_region_ids = {row[0] for row in temp_conn.execute('SELECT id FROM temp_regions')}
temp_city_ids = {row[0] for row in temp_conn.execute('SELECT id FROM temp_cities')}
with transaction.atomic():
countries_deleted = Country.objects.exclude(country_code__in=temp_country_codes).count()
regions_deleted = Region.objects.exclude(id__in=temp_region_ids).count()
cities_deleted = City.objects.exclude(id__in=temp_city_ids).count()
Country.objects.exclude(country_code__in=temp_country_codes).delete()
Region.objects.exclude(id__in=temp_region_ids).delete()
City.objects.exclude(id__in=temp_city_ids).delete()
if countries_deleted > 0 or regions_deleted > 0 or cities_deleted > 0:
self.stdout.write(f'✓ Deleted {countries_deleted} obsolete countries, {regions_deleted} regions, {cities_deleted} cities')
else:
self.stdout.write('✓ No obsolete records found to delete')