From f2246921d49d7f1d5dadbbbbef96b4a20fd497d7 Mon Sep 17 00:00:00 2001 From: Sean Morley Date: Mon, 26 May 2025 20:39:24 -0400 Subject: [PATCH] Refactor download-countries command for improved memory efficiency and batch processing; enhance data import logic for countries, regions, and cities. --- .../management/commands/download-countries.py | 363 +++++++++--------- 1 file changed, 171 insertions(+), 192 deletions(-) diff --git a/backend/server/worldtravel/management/commands/download-countries.py b/backend/server/worldtravel/management/commands/download-countries.py index b1b4dac..945cfd9 100644 --- a/backend/server/worldtravel/management/commands/download-countries.py +++ b/backend/server/worldtravel/management/commands/download-countries.py @@ -3,7 +3,6 @@ from django.core.management.base import BaseCommand import requests from worldtravel.models import Country, Region, City from django.db import transaction -from tqdm import tqdm import ijson import gc @@ -44,7 +43,7 @@ class Command(BaseCommand): def handle(self, **options): force = options['force'] - batch_size = 500 # Increased batch size for better performance + batch_size = 1000 # Larger batch size for better efficiency countries_json_path = os.path.join(settings.MEDIA_ROOT, f'countries+regions+states-{COUNTRY_REGION_JSON_VERSION}.json') if not os.path.exists(countries_json_path) or force: @@ -68,41 +67,50 @@ class Command(BaseCommand): self.stdout.write(self.style.SUCCESS('Latest country, region, and state data already downloaded.')) return - # Use sets for faster lookup instead of dictionaries when we only need existence checks - self.stdout.write(self.style.SUCCESS('Loading existing data for comparison...')) - existing_country_codes = set(Country.objects.values_list('country_code', flat=True)) - existing_region_ids = set(Region.objects.values_list('id', flat=True)) - existing_city_ids = set(City.objects.values_list('id', flat=True)) + self.stdout.write(self.style.SUCCESS('Starting memory-efficient import process...')) + + # Pass 1: Process countries only + self.stdout.write(self.style.WARNING('Pass 1: Processing countries...')) + processed_country_codes = self._process_countries_pass(countries_json_path, batch_size) - self.stdout.write(self.style.SUCCESS(f'Found {len(existing_country_codes)} existing countries, {len(existing_region_ids)} regions, {len(existing_city_ids)} cities')) + # Pass 2: Process regions only + self.stdout.write(self.style.WARNING('Pass 2: Processing regions...')) + processed_region_ids = self._process_regions_pass(countries_json_path, batch_size) + + # Pass 3: Process cities only + self.stdout.write(self.style.WARNING('Pass 3: Processing cities...')) + processed_city_ids = self._process_cities_pass(countries_json_path, batch_size) - # Only fetch full objects when we actually need to update them - existing_countries = {} - existing_regions = {} - existing_cities = {} + # Clean up obsolete records + self.stdout.write(self.style.WARNING('Pass 4: Cleaning up obsolete records...')) + with transaction.atomic(): + countries_deleted = Country.objects.exclude(country_code__in=processed_country_codes).count() + regions_deleted = Region.objects.exclude(id__in=processed_region_ids).count() + cities_deleted = City.objects.exclude(id__in=processed_city_ids).count() + + Country.objects.exclude(country_code__in=processed_country_codes).delete() + Region.objects.exclude(id__in=processed_region_ids).delete() + City.objects.exclude(id__in=processed_city_ids).delete() + + if countries_deleted > 0 or regions_deleted > 0 or cities_deleted > 0: + self.stdout.write(f' Deleted {countries_deleted} obsolete countries, {regions_deleted} regions, {cities_deleted} cities') + else: + self.stdout.write(' No obsolete records found to delete') + self.stdout.write(self.style.SUCCESS('All data imported successfully')) + + def _process_countries_pass(self, json_path, batch_size): + """First pass: Process only countries""" + existing_countries = {c.country_code: c for c in Country.objects.all()} processed_country_codes = set() - processed_region_ids = set() - processed_city_ids = set() - - # Process data in streaming fashion to avoid loading everything into memory - self.stdout.write(self.style.SUCCESS('Starting to process country data...')) - with open(countries_json_path, 'rb') as f: + + countries_to_create = [] + countries_to_update = [] + country_count = 0 + + with open(json_path, 'rb') as f: parser = ijson.items(f, 'item') - countries_to_create = [] - regions_to_create = [] - cities_to_create = [] - - countries_to_update = [] - regions_to_update = [] - cities_to_update = [] - - country_count = 0 - total_regions_processed = 0 - total_cities_processed = 0 - batch_number = 1 - for country in parser: country_count += 1 country_code = country['iso2'] @@ -112,16 +120,9 @@ class Command(BaseCommand): longitude = round(float(country['longitude']), 6) if country['longitude'] else None latitude = round(float(country['latitude']), 6) if country['latitude'] else None - if country_count % 10 == 0: - self.stdout.write(f'Processing country {country_count}: {country_name} ({country_code})') - processed_country_codes.add(country_code) - if country_code in existing_country_codes: - # Only fetch when needed for updates - if country_code not in existing_countries: - existing_countries[country_code] = Country.objects.get(country_code=country_code) - + if country_code in existing_countries: country_obj = existing_countries[country_code] country_obj.name = country_name country_obj.subregion = country_subregion @@ -140,11 +141,43 @@ class Command(BaseCommand): ) countries_to_create.append(country_obj) + # Download flag saveCountryFlag(country_code) - # Process states/regions - region_count_for_country = 0 - city_count_for_country = 0 + # Process in batches to limit memory usage + if len(countries_to_create) >= batch_size or len(countries_to_update) >= batch_size: + self._flush_countries_batch(countries_to_create, countries_to_update, batch_size) + countries_to_create.clear() + countries_to_update.clear() + gc.collect() + + if country_count % 50 == 0: + self.stdout.write(f' Processed {country_count} countries...') + + # Process remaining countries + if countries_to_create or countries_to_update: + self._flush_countries_batch(countries_to_create, countries_to_update, batch_size) + + self.stdout.write(f' Completed processing {country_count} countries') + return processed_country_codes + + def _process_regions_pass(self, json_path, batch_size): + """Second pass: Process only regions""" + existing_regions = {r.id: r for r in Region.objects.all()} + countries_dict = {c.country_code: c for c in Country.objects.all()} + processed_region_ids = set() + + regions_to_create = [] + regions_to_update = [] + region_count = 0 + + with open(json_path, 'rb') as f: + parser = ijson.items(f, 'item') + + for country in parser: + country_code = country['iso2'] + country_name = country['name'] + country_obj = countries_dict[country_code] if country['states']: for state in country['states']: @@ -157,13 +190,9 @@ class Command(BaseCommand): continue processed_region_ids.add(state_id) - region_count_for_country += 1 - total_regions_processed += 1 + region_count += 1 - if state_id in existing_region_ids: - if state_id not in existing_regions: - existing_regions[state_id] = Region.objects.get(id=state_id) - + if state_id in existing_regions: region_obj = existing_regions[state_id] region_obj.name = name region_obj.country = country_obj @@ -180,7 +209,66 @@ class Command(BaseCommand): ) regions_to_create.append(region_obj) - # Process cities + # Process in batches + if len(regions_to_create) >= batch_size or len(regions_to_update) >= batch_size: + self._flush_regions_batch(regions_to_create, regions_to_update, batch_size) + regions_to_create.clear() + regions_to_update.clear() + gc.collect() + else: + # Country without states - create a default region + state_id = f"{country_code}-00" + if state_id not in processed_region_ids: + processed_region_ids.add(state_id) + region_count += 1 + + if state_id in existing_regions: + region_obj = existing_regions[state_id] + region_obj.name = country_name + region_obj.country = country_obj + regions_to_update.append(region_obj) + else: + region_obj = Region( + id=state_id, + name=country_name, + country=country_obj + ) + regions_to_create.append(region_obj) + + if region_count % 1000 == 0 and region_count > 0: + self.stdout.write(f' Processed {region_count} regions...') + + # Process remaining regions + if regions_to_create or regions_to_update: + self._flush_regions_batch(regions_to_create, regions_to_update, batch_size) + + self.stdout.write(f' Completed processing {region_count} regions') + return processed_region_ids + + def _process_cities_pass(self, json_path, batch_size): + """Third pass: Process only cities""" + existing_cities = {c.id: c for c in City.objects.all()} + regions_dict = {r.id: r for r in Region.objects.all()} + processed_city_ids = set() + + cities_to_create = [] + cities_to_update = [] + city_count = 0 + + with open(json_path, 'rb') as f: + parser = ijson.items(f, 'item') + + for country in parser: + country_code = country['iso2'] + + if country['states']: + for state in country['states']: + state_id = f"{country_code}-{state['state_code']}" + region_obj = regions_dict.get(state_id) + + if not region_obj: + continue + if 'cities' in state and len(state['cities']) > 0: for city in state['cities']: city_id = f"{state_id}-{city['id']}" @@ -192,13 +280,9 @@ class Command(BaseCommand): continue processed_city_ids.add(city_id) - city_count_for_country += 1 - total_cities_processed += 1 + city_count += 1 - if city_id in existing_city_ids: - if city_id not in existing_cities: - existing_cities[city_id] = City.objects.get(id=city_id) - + if city_id in existing_cities: city_obj = existing_cities[city_id] city_obj.name = city_name city_obj.region = region_obj @@ -214,164 +298,59 @@ class Command(BaseCommand): latitude=latitude ) cities_to_create.append(city_obj) - else: - # Country without states - create a default region - state_id = f"{country_code}-00" - processed_region_ids.add(state_id) - region_count_for_country = 1 - total_regions_processed += 1 - - if state_id in existing_region_ids: - if state_id not in existing_regions: - existing_regions[state_id] = Region.objects.get(id=state_id) - - region_obj = existing_regions[state_id] - region_obj.name = country_name - region_obj.country = country_obj - regions_to_update.append(region_obj) - else: - region_obj = Region( - id=state_id, - name=country_name, - country=country_obj - ) - regions_to_create.append(region_obj) - if country_count % 10 == 0: - self.stdout.write(f' └─ {country_name}: {region_count_for_country} regions, {city_count_for_country} cities') + # Process in batches + if len(cities_to_create) >= batch_size or len(cities_to_update) >= batch_size: + self._flush_cities_batch(cities_to_create, cities_to_update, batch_size) + cities_to_create.clear() + cities_to_update.clear() + gc.collect() - # Process in batches during iteration to manage memory - if country_count % 50 == 0: # Process every 50 countries - self.stdout.write(self.style.WARNING(f'Processing batch {batch_number} (countries {country_count-49}-{country_count})...')) - self.stdout.write(f' Countries to create: {len(countries_to_create)}, to update: {len(countries_to_update)}') - self.stdout.write(f' Regions to create: {len(regions_to_create)}, to update: {len(regions_to_update)}') - self.stdout.write(f' Cities to create: {len(cities_to_create)}, to update: {len(cities_to_update)}') - - self._process_batches( - countries_to_create, regions_to_create, cities_to_create, - countries_to_update, regions_to_update, cities_to_update, - batch_size - ) - - self.stdout.write(self.style.SUCCESS(f'✓ Batch {batch_number} completed successfully')) - - # Clear processed batches and force garbage collection - countries_to_create.clear() - regions_to_create.clear() - cities_to_create.clear() - countries_to_update.clear() - regions_to_update.clear() - cities_to_update.clear() - - # Clear the cached objects to free memory - existing_countries.clear() - existing_regions.clear() - existing_cities.clear() - - gc.collect() - batch_number += 1 + if city_count % 5000 == 0 and city_count > 0: + self.stdout.write(f' Processed {city_count} cities...') - # Process remaining batches - if countries_to_create or regions_to_create or cities_to_create or \ - countries_to_update or regions_to_update or cities_to_update: - self.stdout.write(self.style.WARNING(f'Processing final batch {batch_number} (remaining {len(countries_to_create + countries_to_update)} countries)...')) - self.stdout.write(f' Countries to create: {len(countries_to_create)}, to update: {len(countries_to_update)}') - self.stdout.write(f' Regions to create: {len(regions_to_create)}, to update: {len(regions_to_update)}') - self.stdout.write(f' Cities to create: {len(cities_to_create)}, to update: {len(cities_to_update)}') - - self._process_batches( - countries_to_create, regions_to_create, cities_to_create, - countries_to_update, regions_to_update, cities_to_update, - batch_size - ) - self.stdout.write(self.style.SUCCESS(f'✓ Final batch completed successfully')) + # Process remaining cities + if cities_to_create or cities_to_update: + self._flush_cities_batch(cities_to_create, cities_to_update, batch_size) - self.stdout.write(self.style.SUCCESS(f'Finished processing {country_count} countries, {total_regions_processed} regions, {total_cities_processed} cities')) + self.stdout.write(f' Completed processing {city_count} cities') + return processed_city_ids - # Clean up obsolete records - self.stdout.write(self.style.WARNING('Cleaning up obsolete records...')) + def _flush_countries_batch(self, countries_to_create, countries_to_update, batch_size): + """Flush countries batch to database""" with transaction.atomic(): - countries_deleted = Country.objects.exclude(country_code__in=processed_country_codes).count() - regions_deleted = Region.objects.exclude(id__in=processed_region_ids).count() - cities_deleted = City.objects.exclude(id__in=processed_city_ids).count() - - Country.objects.exclude(country_code__in=processed_country_codes).delete() - Region.objects.exclude(id__in=processed_region_ids).delete() - City.objects.exclude(id__in=processed_city_ids).delete() - - if countries_deleted > 0 or regions_deleted > 0 or cities_deleted > 0: - self.stdout.write(f' Deleted {countries_deleted} obsolete countries, {regions_deleted} regions, {cities_deleted} cities') - else: - self.stdout.write(' No obsolete records found to delete') - - self.stdout.write(self.style.SUCCESS('All data imported successfully')) - - def _process_batches(self, countries_to_create, regions_to_create, cities_to_create, - countries_to_update, regions_to_update, cities_to_update, batch_size): - """Process all pending batches in a single transaction, safely""" - with transaction.atomic(): - # 1. Create new countries if countries_to_create: - self.stdout.write(f' Creating {len(countries_to_create)} countries in batches of {batch_size}...') for i in range(0, len(countries_to_create), batch_size): batch = countries_to_create[i:i + batch_size] Country.objects.bulk_create(batch, ignore_conflicts=True) - # 2. Re-fetch the now-saved countries from the DB - saved_country_map = { - c.country_code: c for c in Country.objects.filter( - country_code__in=[c.country_code for c in countries_to_create] - ) - } - - # 3. Rebind Region.country to actual saved Country objects - for region in regions_to_create: - if isinstance(region.country, Country): - region.country = saved_country_map.get(region.country.country_code) - - # 4. Create new regions - if regions_to_create: - self.stdout.write(f' Creating {len(regions_to_create)} regions in batches of {batch_size}...') - for i in range(0, len(regions_to_create), batch_size): - batch = regions_to_create[i:i + batch_size] - Region.objects.bulk_create(batch, ignore_conflicts=True) - - # 5. Re-fetch the now-saved regions from the DB - saved_region_map = { - r.id: r for r in Region.objects.filter( - id__in=[r.id for r in regions_to_create] - ) - } - - # 6. Rebind City.region to actual saved Region objects - for city in cities_to_create: - if isinstance(city.region, Region): - city.region = saved_region_map.get(city.region.id) - - # 7. Create new cities - if cities_to_create: - self.stdout.write(f' Creating {len(cities_to_create)} cities in batches of {batch_size}...') - for i in range(0, len(cities_to_create), batch_size): - batch = cities_to_create[i:i + batch_size] - City.objects.bulk_create(batch, ignore_conflicts=True) - - # 8. Update existing countries if countries_to_update: - self.stdout.write(f' Updating {len(countries_to_update)} countries in batches of {batch_size}...') for i in range(0, len(countries_to_update), batch_size): batch = countries_to_update[i:i + batch_size] Country.objects.bulk_update(batch, ['name', 'subregion', 'capital', 'longitude', 'latitude']) - # 9. Update existing regions + def _flush_regions_batch(self, regions_to_create, regions_to_update, batch_size): + """Flush regions batch to database""" + with transaction.atomic(): + if regions_to_create: + for i in range(0, len(regions_to_create), batch_size): + batch = regions_to_create[i:i + batch_size] + Region.objects.bulk_create(batch, ignore_conflicts=True) + if regions_to_update: - self.stdout.write(f' Updating {len(regions_to_update)} regions in batches of {batch_size}...') for i in range(0, len(regions_to_update), batch_size): batch = regions_to_update[i:i + batch_size] Region.objects.bulk_update(batch, ['name', 'country', 'longitude', 'latitude']) - # 10. Update existing cities + def _flush_cities_batch(self, cities_to_create, cities_to_update, batch_size): + """Flush cities batch to database""" + with transaction.atomic(): + if cities_to_create: + for i in range(0, len(cities_to_create), batch_size): + batch = cities_to_create[i:i + batch_size] + City.objects.bulk_create(batch, ignore_conflicts=True) + if cities_to_update: - self.stdout.write(f' Updating {len(cities_to_update)} cities in batches of {batch_size}...') for i in range(0, len(cities_to_update), batch_size): batch = cities_to_update[i:i + batch_size] - City.objects.bulk_update(batch, ['name', 'region', 'longitude', 'latitude']) + City.objects.bulk_update(batch, ['name', 'region', 'longitude', 'latitude']) \ No newline at end of file