1
0
Fork 0
mirror of https://github.com/seanmorley15/AdventureLog.git synced 2025-07-19 12:59:36 +02:00

Refactor download-countries command for improved memory efficiency and batch processing; enhance data import logic for countries, regions, and cities.

This commit is contained in:
Sean Morley 2025-05-26 20:39:24 -04:00
parent 575669aedf
commit f2246921d4

View file

@ -3,7 +3,6 @@ from django.core.management.base import BaseCommand
import requests import requests
from worldtravel.models import Country, Region, City from worldtravel.models import Country, Region, City
from django.db import transaction from django.db import transaction
from tqdm import tqdm
import ijson import ijson
import gc import gc
@ -44,7 +43,7 @@ class Command(BaseCommand):
def handle(self, **options): def handle(self, **options):
force = options['force'] force = options['force']
batch_size = 500 # Increased batch size for better performance batch_size = 1000 # Larger batch size for better efficiency
countries_json_path = os.path.join(settings.MEDIA_ROOT, f'countries+regions+states-{COUNTRY_REGION_JSON_VERSION}.json') countries_json_path = os.path.join(settings.MEDIA_ROOT, f'countries+regions+states-{COUNTRY_REGION_JSON_VERSION}.json')
if not os.path.exists(countries_json_path) or force: if not os.path.exists(countries_json_path) or force:
@ -68,41 +67,50 @@ class Command(BaseCommand):
self.stdout.write(self.style.SUCCESS('Latest country, region, and state data already downloaded.')) self.stdout.write(self.style.SUCCESS('Latest country, region, and state data already downloaded.'))
return return
# Use sets for faster lookup instead of dictionaries when we only need existence checks self.stdout.write(self.style.SUCCESS('Starting memory-efficient import process...'))
self.stdout.write(self.style.SUCCESS('Loading existing data for comparison...'))
existing_country_codes = set(Country.objects.values_list('country_code', flat=True))
existing_region_ids = set(Region.objects.values_list('id', flat=True))
existing_city_ids = set(City.objects.values_list('id', flat=True))
self.stdout.write(self.style.SUCCESS(f'Found {len(existing_country_codes)} existing countries, {len(existing_region_ids)} regions, {len(existing_city_ids)} cities')) # Pass 1: Process countries only
self.stdout.write(self.style.WARNING('Pass 1: Processing countries...'))
processed_country_codes = self._process_countries_pass(countries_json_path, batch_size)
# Only fetch full objects when we actually need to update them # Pass 2: Process regions only
existing_countries = {} self.stdout.write(self.style.WARNING('Pass 2: Processing regions...'))
existing_regions = {} processed_region_ids = self._process_regions_pass(countries_json_path, batch_size)
existing_cities = {}
# Pass 3: Process cities only
self.stdout.write(self.style.WARNING('Pass 3: Processing cities...'))
processed_city_ids = self._process_cities_pass(countries_json_path, batch_size)
# Clean up obsolete records
self.stdout.write(self.style.WARNING('Pass 4: Cleaning up obsolete records...'))
with transaction.atomic():
countries_deleted = Country.objects.exclude(country_code__in=processed_country_codes).count()
regions_deleted = Region.objects.exclude(id__in=processed_region_ids).count()
cities_deleted = City.objects.exclude(id__in=processed_city_ids).count()
Country.objects.exclude(country_code__in=processed_country_codes).delete()
Region.objects.exclude(id__in=processed_region_ids).delete()
City.objects.exclude(id__in=processed_city_ids).delete()
if countries_deleted > 0 or regions_deleted > 0 or cities_deleted > 0:
self.stdout.write(f' Deleted {countries_deleted} obsolete countries, {regions_deleted} regions, {cities_deleted} cities')
else:
self.stdout.write(' No obsolete records found to delete')
self.stdout.write(self.style.SUCCESS('All data imported successfully'))
def _process_countries_pass(self, json_path, batch_size):
"""First pass: Process only countries"""
existing_countries = {c.country_code: c for c in Country.objects.all()}
processed_country_codes = set() processed_country_codes = set()
processed_region_ids = set()
processed_city_ids = set()
# Process data in streaming fashion to avoid loading everything into memory countries_to_create = []
self.stdout.write(self.style.SUCCESS('Starting to process country data...')) countries_to_update = []
with open(countries_json_path, 'rb') as f: country_count = 0
with open(json_path, 'rb') as f:
parser = ijson.items(f, 'item') parser = ijson.items(f, 'item')
countries_to_create = []
regions_to_create = []
cities_to_create = []
countries_to_update = []
regions_to_update = []
cities_to_update = []
country_count = 0
total_regions_processed = 0
total_cities_processed = 0
batch_number = 1
for country in parser: for country in parser:
country_count += 1 country_count += 1
country_code = country['iso2'] country_code = country['iso2']
@ -112,16 +120,9 @@ class Command(BaseCommand):
longitude = round(float(country['longitude']), 6) if country['longitude'] else None longitude = round(float(country['longitude']), 6) if country['longitude'] else None
latitude = round(float(country['latitude']), 6) if country['latitude'] else None latitude = round(float(country['latitude']), 6) if country['latitude'] else None
if country_count % 10 == 0:
self.stdout.write(f'Processing country {country_count}: {country_name} ({country_code})')
processed_country_codes.add(country_code) processed_country_codes.add(country_code)
if country_code in existing_country_codes: if country_code in existing_countries:
# Only fetch when needed for updates
if country_code not in existing_countries:
existing_countries[country_code] = Country.objects.get(country_code=country_code)
country_obj = existing_countries[country_code] country_obj = existing_countries[country_code]
country_obj.name = country_name country_obj.name = country_name
country_obj.subregion = country_subregion country_obj.subregion = country_subregion
@ -140,11 +141,43 @@ class Command(BaseCommand):
) )
countries_to_create.append(country_obj) countries_to_create.append(country_obj)
# Download flag
saveCountryFlag(country_code) saveCountryFlag(country_code)
# Process states/regions # Process in batches to limit memory usage
region_count_for_country = 0 if len(countries_to_create) >= batch_size or len(countries_to_update) >= batch_size:
city_count_for_country = 0 self._flush_countries_batch(countries_to_create, countries_to_update, batch_size)
countries_to_create.clear()
countries_to_update.clear()
gc.collect()
if country_count % 50 == 0:
self.stdout.write(f' Processed {country_count} countries...')
# Process remaining countries
if countries_to_create or countries_to_update:
self._flush_countries_batch(countries_to_create, countries_to_update, batch_size)
self.stdout.write(f' Completed processing {country_count} countries')
return processed_country_codes
def _process_regions_pass(self, json_path, batch_size):
"""Second pass: Process only regions"""
existing_regions = {r.id: r for r in Region.objects.all()}
countries_dict = {c.country_code: c for c in Country.objects.all()}
processed_region_ids = set()
regions_to_create = []
regions_to_update = []
region_count = 0
with open(json_path, 'rb') as f:
parser = ijson.items(f, 'item')
for country in parser:
country_code = country['iso2']
country_name = country['name']
country_obj = countries_dict[country_code]
if country['states']: if country['states']:
for state in country['states']: for state in country['states']:
@ -157,13 +190,9 @@ class Command(BaseCommand):
continue continue
processed_region_ids.add(state_id) processed_region_ids.add(state_id)
region_count_for_country += 1 region_count += 1
total_regions_processed += 1
if state_id in existing_region_ids:
if state_id not in existing_regions:
existing_regions[state_id] = Region.objects.get(id=state_id)
if state_id in existing_regions:
region_obj = existing_regions[state_id] region_obj = existing_regions[state_id]
region_obj.name = name region_obj.name = name
region_obj.country = country_obj region_obj.country = country_obj
@ -180,7 +209,66 @@ class Command(BaseCommand):
) )
regions_to_create.append(region_obj) regions_to_create.append(region_obj)
# Process cities # Process in batches
if len(regions_to_create) >= batch_size or len(regions_to_update) >= batch_size:
self._flush_regions_batch(regions_to_create, regions_to_update, batch_size)
regions_to_create.clear()
regions_to_update.clear()
gc.collect()
else:
# Country without states - create a default region
state_id = f"{country_code}-00"
if state_id not in processed_region_ids:
processed_region_ids.add(state_id)
region_count += 1
if state_id in existing_regions:
region_obj = existing_regions[state_id]
region_obj.name = country_name
region_obj.country = country_obj
regions_to_update.append(region_obj)
else:
region_obj = Region(
id=state_id,
name=country_name,
country=country_obj
)
regions_to_create.append(region_obj)
if region_count % 1000 == 0 and region_count > 0:
self.stdout.write(f' Processed {region_count} regions...')
# Process remaining regions
if regions_to_create or regions_to_update:
self._flush_regions_batch(regions_to_create, regions_to_update, batch_size)
self.stdout.write(f' Completed processing {region_count} regions')
return processed_region_ids
def _process_cities_pass(self, json_path, batch_size):
"""Third pass: Process only cities"""
existing_cities = {c.id: c for c in City.objects.all()}
regions_dict = {r.id: r for r in Region.objects.all()}
processed_city_ids = set()
cities_to_create = []
cities_to_update = []
city_count = 0
with open(json_path, 'rb') as f:
parser = ijson.items(f, 'item')
for country in parser:
country_code = country['iso2']
if country['states']:
for state in country['states']:
state_id = f"{country_code}-{state['state_code']}"
region_obj = regions_dict.get(state_id)
if not region_obj:
continue
if 'cities' in state and len(state['cities']) > 0: if 'cities' in state and len(state['cities']) > 0:
for city in state['cities']: for city in state['cities']:
city_id = f"{state_id}-{city['id']}" city_id = f"{state_id}-{city['id']}"
@ -192,13 +280,9 @@ class Command(BaseCommand):
continue continue
processed_city_ids.add(city_id) processed_city_ids.add(city_id)
city_count_for_country += 1 city_count += 1
total_cities_processed += 1
if city_id in existing_city_ids:
if city_id not in existing_cities:
existing_cities[city_id] = City.objects.get(id=city_id)
if city_id in existing_cities:
city_obj = existing_cities[city_id] city_obj = existing_cities[city_id]
city_obj.name = city_name city_obj.name = city_name
city_obj.region = region_obj city_obj.region = region_obj
@ -214,164 +298,59 @@ class Command(BaseCommand):
latitude=latitude latitude=latitude
) )
cities_to_create.append(city_obj) cities_to_create.append(city_obj)
else:
# Country without states - create a default region
state_id = f"{country_code}-00"
processed_region_ids.add(state_id)
region_count_for_country = 1
total_regions_processed += 1
if state_id in existing_region_ids: # Process in batches
if state_id not in existing_regions: if len(cities_to_create) >= batch_size or len(cities_to_update) >= batch_size:
existing_regions[state_id] = Region.objects.get(id=state_id) self._flush_cities_batch(cities_to_create, cities_to_update, batch_size)
cities_to_create.clear()
cities_to_update.clear()
gc.collect()
region_obj = existing_regions[state_id] if city_count % 5000 == 0 and city_count > 0:
region_obj.name = country_name self.stdout.write(f' Processed {city_count} cities...')
region_obj.country = country_obj
regions_to_update.append(region_obj)
else:
region_obj = Region(
id=state_id,
name=country_name,
country=country_obj
)
regions_to_create.append(region_obj)
if country_count % 10 == 0: # Process remaining cities
self.stdout.write(f' └─ {country_name}: {region_count_for_country} regions, {city_count_for_country} cities') if cities_to_create or cities_to_update:
self._flush_cities_batch(cities_to_create, cities_to_update, batch_size)
# Process in batches during iteration to manage memory self.stdout.write(f' Completed processing {city_count} cities')
if country_count % 50 == 0: # Process every 50 countries return processed_city_ids
self.stdout.write(self.style.WARNING(f'Processing batch {batch_number} (countries {country_count-49}-{country_count})...'))
self.stdout.write(f' Countries to create: {len(countries_to_create)}, to update: {len(countries_to_update)}')
self.stdout.write(f' Regions to create: {len(regions_to_create)}, to update: {len(regions_to_update)}')
self.stdout.write(f' Cities to create: {len(cities_to_create)}, to update: {len(cities_to_update)}')
self._process_batches( def _flush_countries_batch(self, countries_to_create, countries_to_update, batch_size):
countries_to_create, regions_to_create, cities_to_create, """Flush countries batch to database"""
countries_to_update, regions_to_update, cities_to_update,
batch_size
)
self.stdout.write(self.style.SUCCESS(f'✓ Batch {batch_number} completed successfully'))
# Clear processed batches and force garbage collection
countries_to_create.clear()
regions_to_create.clear()
cities_to_create.clear()
countries_to_update.clear()
regions_to_update.clear()
cities_to_update.clear()
# Clear the cached objects to free memory
existing_countries.clear()
existing_regions.clear()
existing_cities.clear()
gc.collect()
batch_number += 1
# Process remaining batches
if countries_to_create or regions_to_create or cities_to_create or \
countries_to_update or regions_to_update or cities_to_update:
self.stdout.write(self.style.WARNING(f'Processing final batch {batch_number} (remaining {len(countries_to_create + countries_to_update)} countries)...'))
self.stdout.write(f' Countries to create: {len(countries_to_create)}, to update: {len(countries_to_update)}')
self.stdout.write(f' Regions to create: {len(regions_to_create)}, to update: {len(regions_to_update)}')
self.stdout.write(f' Cities to create: {len(cities_to_create)}, to update: {len(cities_to_update)}')
self._process_batches(
countries_to_create, regions_to_create, cities_to_create,
countries_to_update, regions_to_update, cities_to_update,
batch_size
)
self.stdout.write(self.style.SUCCESS(f'✓ Final batch completed successfully'))
self.stdout.write(self.style.SUCCESS(f'Finished processing {country_count} countries, {total_regions_processed} regions, {total_cities_processed} cities'))
# Clean up obsolete records
self.stdout.write(self.style.WARNING('Cleaning up obsolete records...'))
with transaction.atomic(): with transaction.atomic():
countries_deleted = Country.objects.exclude(country_code__in=processed_country_codes).count()
regions_deleted = Region.objects.exclude(id__in=processed_region_ids).count()
cities_deleted = City.objects.exclude(id__in=processed_city_ids).count()
Country.objects.exclude(country_code__in=processed_country_codes).delete()
Region.objects.exclude(id__in=processed_region_ids).delete()
City.objects.exclude(id__in=processed_city_ids).delete()
if countries_deleted > 0 or regions_deleted > 0 or cities_deleted > 0:
self.stdout.write(f' Deleted {countries_deleted} obsolete countries, {regions_deleted} regions, {cities_deleted} cities')
else:
self.stdout.write(' No obsolete records found to delete')
self.stdout.write(self.style.SUCCESS('All data imported successfully'))
def _process_batches(self, countries_to_create, regions_to_create, cities_to_create,
countries_to_update, regions_to_update, cities_to_update, batch_size):
"""Process all pending batches in a single transaction, safely"""
with transaction.atomic():
# 1. Create new countries
if countries_to_create: if countries_to_create:
self.stdout.write(f' Creating {len(countries_to_create)} countries in batches of {batch_size}...')
for i in range(0, len(countries_to_create), batch_size): for i in range(0, len(countries_to_create), batch_size):
batch = countries_to_create[i:i + batch_size] batch = countries_to_create[i:i + batch_size]
Country.objects.bulk_create(batch, ignore_conflicts=True) Country.objects.bulk_create(batch, ignore_conflicts=True)
# 2. Re-fetch the now-saved countries from the DB
saved_country_map = {
c.country_code: c for c in Country.objects.filter(
country_code__in=[c.country_code for c in countries_to_create]
)
}
# 3. Rebind Region.country to actual saved Country objects
for region in regions_to_create:
if isinstance(region.country, Country):
region.country = saved_country_map.get(region.country.country_code)
# 4. Create new regions
if regions_to_create:
self.stdout.write(f' Creating {len(regions_to_create)} regions in batches of {batch_size}...')
for i in range(0, len(regions_to_create), batch_size):
batch = regions_to_create[i:i + batch_size]
Region.objects.bulk_create(batch, ignore_conflicts=True)
# 5. Re-fetch the now-saved regions from the DB
saved_region_map = {
r.id: r for r in Region.objects.filter(
id__in=[r.id for r in regions_to_create]
)
}
# 6. Rebind City.region to actual saved Region objects
for city in cities_to_create:
if isinstance(city.region, Region):
city.region = saved_region_map.get(city.region.id)
# 7. Create new cities
if cities_to_create:
self.stdout.write(f' Creating {len(cities_to_create)} cities in batches of {batch_size}...')
for i in range(0, len(cities_to_create), batch_size):
batch = cities_to_create[i:i + batch_size]
City.objects.bulk_create(batch, ignore_conflicts=True)
# 8. Update existing countries
if countries_to_update: if countries_to_update:
self.stdout.write(f' Updating {len(countries_to_update)} countries in batches of {batch_size}...')
for i in range(0, len(countries_to_update), batch_size): for i in range(0, len(countries_to_update), batch_size):
batch = countries_to_update[i:i + batch_size] batch = countries_to_update[i:i + batch_size]
Country.objects.bulk_update(batch, ['name', 'subregion', 'capital', 'longitude', 'latitude']) Country.objects.bulk_update(batch, ['name', 'subregion', 'capital', 'longitude', 'latitude'])
# 9. Update existing regions def _flush_regions_batch(self, regions_to_create, regions_to_update, batch_size):
"""Flush regions batch to database"""
with transaction.atomic():
if regions_to_create:
for i in range(0, len(regions_to_create), batch_size):
batch = regions_to_create[i:i + batch_size]
Region.objects.bulk_create(batch, ignore_conflicts=True)
if regions_to_update: if regions_to_update:
self.stdout.write(f' Updating {len(regions_to_update)} regions in batches of {batch_size}...')
for i in range(0, len(regions_to_update), batch_size): for i in range(0, len(regions_to_update), batch_size):
batch = regions_to_update[i:i + batch_size] batch = regions_to_update[i:i + batch_size]
Region.objects.bulk_update(batch, ['name', 'country', 'longitude', 'latitude']) Region.objects.bulk_update(batch, ['name', 'country', 'longitude', 'latitude'])
# 10. Update existing cities def _flush_cities_batch(self, cities_to_create, cities_to_update, batch_size):
"""Flush cities batch to database"""
with transaction.atomic():
if cities_to_create:
for i in range(0, len(cities_to_create), batch_size):
batch = cities_to_create[i:i + batch_size]
City.objects.bulk_create(batch, ignore_conflicts=True)
if cities_to_update: if cities_to_update:
self.stdout.write(f' Updating {len(cities_to_update)} cities in batches of {batch_size}...')
for i in range(0, len(cities_to_update), batch_size): for i in range(0, len(cities_to_update), batch_size):
batch = cities_to_update[i:i + batch_size] batch = cities_to_update[i:i + batch_size]
City.objects.bulk_update(batch, ['name', 'region', 'longitude', 'latitude']) City.objects.bulk_update(batch, ['name', 'region', 'longitude', 'latitude'])