1
0
Fork 0
mirror of https://github.com/seanmorley15/AdventureLog.git synced 2025-07-29 17:59:36 +02:00

Optimize country data import process: increase batch size, implement memory management, and streamline record creation and updates

This commit is contained in:
Sean Morley 2025-05-24 19:05:16 -04:00
parent 9e304f81fe
commit c123231bab

View file

@ -5,6 +5,7 @@ from worldtravel.models import Country, Region, City
from django.db import transaction from django.db import transaction
from tqdm import tqdm from tqdm import tqdm
import ijson import ijson
import gc
from django.conf import settings from django.conf import settings
@ -43,8 +44,9 @@ class Command(BaseCommand):
def handle(self, **options): def handle(self, **options):
force = options['force'] force = options['force']
batch_size = 100 batch_size = 500 # Increased batch size for better performance
countries_json_path = os.path.join(settings.MEDIA_ROOT, f'countries+regions+states-{COUNTRY_REGION_JSON_VERSION}.json') countries_json_path = os.path.join(settings.MEDIA_ROOT, f'countries+regions+states-{COUNTRY_REGION_JSON_VERSION}.json')
if not os.path.exists(countries_json_path) or force: if not os.path.exists(countries_json_path) or force:
res = requests.get(f'https://raw.githubusercontent.com/dr5hn/countries-states-cities-database/{COUNTRY_REGION_JSON_VERSION}/json/countries%2Bstates%2Bcities.json') res = requests.get(f'https://raw.githubusercontent.com/dr5hn/countries-states-cities-database/{COUNTRY_REGION_JSON_VERSION}/json/countries%2Bstates%2Bcities.json')
if res.status_code == 200: if res.status_code == 200:
@ -64,28 +66,44 @@ class Command(BaseCommand):
else: else:
self.stdout.write(self.style.SUCCESS('Latest country, region, and state data already downloaded.')) self.stdout.write(self.style.SUCCESS('Latest country, region, and state data already downloaded.'))
return return
with open(countries_json_path, 'r') as f: # Use sets for faster lookup instead of dictionaries when we only need existence checks
f = open(countries_json_path, 'rb') self.stdout.write(self.style.SUCCESS('Loading existing data for comparison...'))
existing_country_codes = set(Country.objects.values_list('country_code', flat=True))
existing_region_ids = set(Region.objects.values_list('id', flat=True))
existing_city_ids = set(City.objects.values_list('id', flat=True))
self.stdout.write(self.style.SUCCESS(f'Found {len(existing_country_codes)} existing countries, {len(existing_region_ids)} regions, {len(existing_city_ids)} cities'))
# Only fetch full objects when we actually need to update them
existing_countries = {}
existing_regions = {}
existing_cities = {}
processed_country_codes = set()
processed_region_ids = set()
processed_city_ids = set()
# Process data in streaming fashion to avoid loading everything into memory
self.stdout.write(self.style.SUCCESS('Starting to process country data...'))
with open(countries_json_path, 'rb') as f:
parser = ijson.items(f, 'item') parser = ijson.items(f, 'item')
with transaction.atomic():
existing_countries = {country.country_code: country for country in Country.objects.all()}
existing_regions = {region.id: region for region in Region.objects.all()}
existing_cities = {city.id: city for city in City.objects.all()}
countries_to_create = [] countries_to_create = []
regions_to_create = [] regions_to_create = []
cities_to_create = []
countries_to_update = [] countries_to_update = []
regions_to_update = [] regions_to_update = []
cities_to_create = []
cities_to_update = [] cities_to_update = []
processed_country_codes = set() country_count = 0
processed_region_ids = set() total_regions_processed = 0
processed_city_ids = set() total_cities_processed = 0
batch_number = 1
for country in parser: for country in parser:
country_count += 1
country_code = country['iso2'] country_code = country['iso2']
country_name = country['name'] country_name = country['name']
country_subregion = country['subregion'] country_subregion = country['subregion']
@ -93,9 +111,16 @@ class Command(BaseCommand):
longitude = round(float(country['longitude']), 6) if country['longitude'] else None longitude = round(float(country['longitude']), 6) if country['longitude'] else None
latitude = round(float(country['latitude']), 6) if country['latitude'] else None latitude = round(float(country['latitude']), 6) if country['latitude'] else None
if country_count % 10 == 0:
self.stdout.write(f'Processing country {country_count}: {country_name} ({country_code})')
processed_country_codes.add(country_code) processed_country_codes.add(country_code)
if country_code in existing_countries: if country_code in existing_country_codes:
# Only fetch when needed for updates
if country_code not in existing_countries:
existing_countries[country_code] = Country.objects.get(country_code=country_code)
country_obj = existing_countries[country_code] country_obj = existing_countries[country_code]
country_obj.name = country_name country_obj.name = country_name
country_obj.subregion = country_subregion country_obj.subregion = country_subregion
@ -116,6 +141,10 @@ class Command(BaseCommand):
saveCountryFlag(country_code) saveCountryFlag(country_code)
# Process states/regions
region_count_for_country = 0
city_count_for_country = 0
if country['states']: if country['states']:
for state in country['states']: for state in country['states']:
name = state['name'] name = state['name']
@ -123,14 +152,17 @@ class Command(BaseCommand):
latitude = round(float(state['latitude']), 6) if state['latitude'] else None latitude = round(float(state['latitude']), 6) if state['latitude'] else None
longitude = round(float(state['longitude']), 6) if state['longitude'] else None longitude = round(float(state['longitude']), 6) if state['longitude'] else None
# Check for duplicate regions
if state_id in processed_region_ids: if state_id in processed_region_ids:
# self.stdout.write(self.style.ERROR(f'State {state_id} already processed'))
continue continue
processed_region_ids.add(state_id) processed_region_ids.add(state_id)
region_count_for_country += 1
total_regions_processed += 1
if state_id in existing_regions: if state_id in existing_region_ids:
if state_id not in existing_regions:
existing_regions[state_id] = Region.objects.get(id=state_id)
region_obj = existing_regions[state_id] region_obj = existing_regions[state_id]
region_obj.name = name region_obj.name = name
region_obj.country = country_obj region_obj.country = country_obj
@ -146,8 +178,8 @@ class Command(BaseCommand):
latitude=latitude latitude=latitude
) )
regions_to_create.append(region_obj) regions_to_create.append(region_obj)
# self.stdout.write(self.style.SUCCESS(f'State {state_id} prepared'))
# Process cities
if 'cities' in state and len(state['cities']) > 0: if 'cities' in state and len(state['cities']) > 0:
for city in state['cities']: for city in state['cities']:
city_id = f"{state_id}-{city['id']}" city_id = f"{state_id}-{city['id']}"
@ -155,14 +187,17 @@ class Command(BaseCommand):
latitude = round(float(city['latitude']), 6) if city['latitude'] else None latitude = round(float(city['latitude']), 6) if city['latitude'] else None
longitude = round(float(city['longitude']), 6) if city['longitude'] else None longitude = round(float(city['longitude']), 6) if city['longitude'] else None
# Check for duplicate cities
if city_id in processed_city_ids: if city_id in processed_city_ids:
# self.stdout.write(self.style.ERROR(f'City {city_id} already processed'))
continue continue
processed_city_ids.add(city_id) processed_city_ids.add(city_id)
city_count_for_country += 1
total_cities_processed += 1
if city_id in existing_cities: if city_id in existing_city_ids:
if city_id not in existing_cities:
existing_cities[city_id] = City.objects.get(id=city_id)
city_obj = existing_cities[city_id] city_obj = existing_cities[city_id]
city_obj.name = city_name city_obj.name = city_name
city_obj.region = region_obj city_obj.region = region_obj
@ -178,12 +213,17 @@ class Command(BaseCommand):
latitude=latitude latitude=latitude
) )
cities_to_create.append(city_obj) cities_to_create.append(city_obj)
# self.stdout.write(self.style.SUCCESS(f'City {city_id} prepared'))
else: else:
# Country without states - create a default region
state_id = f"{country_code}-00" state_id = f"{country_code}-00"
processed_region_ids.add(state_id) processed_region_ids.add(state_id)
if state_id in existing_regions: region_count_for_country = 1
total_regions_processed += 1
if state_id in existing_region_ids:
if state_id not in existing_regions:
existing_regions[state_id] = Region.objects.get(id=state_id)
region_obj = existing_regions[state_id] region_obj = existing_regions[state_id]
region_obj.name = country_name region_obj.name = country_name
region_obj.country = country_obj region_obj.country = country_obj
@ -195,35 +235,114 @@ class Command(BaseCommand):
country=country_obj country=country_obj
) )
regions_to_create.append(region_obj) regions_to_create.append(region_obj)
# self.stdout.write(self.style.SUCCESS(f'Region {state_id} prepared for {country_name}'))
for i in tqdm(range(0, len(countries_to_create), batch_size), desc="Processing countries"):
batch = countries_to_create[i:i + batch_size]
Country.objects.bulk_create(batch)
for i in tqdm(range(0, len(regions_to_create), batch_size), desc="Processing regions"): if country_count % 10 == 0:
batch = regions_to_create[i:i + batch_size] self.stdout.write(f' └─ {country_name}: {region_count_for_country} regions, {city_count_for_country} cities')
Region.objects.bulk_create(batch)
for i in tqdm(range(0, len(cities_to_create), batch_size), desc="Processing cities"): # Process in batches during iteration to manage memory
batch = cities_to_create[i:i + batch_size] if country_count % 50 == 0: # Process every 50 countries
City.objects.bulk_create(batch) self.stdout.write(self.style.WARNING(f'Processing batch {batch_number} (countries {country_count-49}-{country_count})...'))
self.stdout.write(f' Countries to create: {len(countries_to_create)}, to update: {len(countries_to_update)}')
self.stdout.write(f' Regions to create: {len(regions_to_create)}, to update: {len(regions_to_update)}')
self.stdout.write(f' Cities to create: {len(cities_to_create)}, to update: {len(cities_to_update)}')
self._process_batches(
countries_to_create, regions_to_create, cities_to_create,
countries_to_update, regions_to_update, cities_to_update,
batch_size
)
self.stdout.write(self.style.SUCCESS(f'✓ Batch {batch_number} completed successfully'))
# Clear processed batches and force garbage collection
countries_to_create.clear()
regions_to_create.clear()
cities_to_create.clear()
countries_to_update.clear()
regions_to_update.clear()
cities_to_update.clear()
# Clear the cached objects to free memory
existing_countries.clear()
existing_regions.clear()
existing_cities.clear()
gc.collect()
batch_number += 1
# Process updates in batches # Process remaining batches
for i in range(0, len(countries_to_update), batch_size): if countries_to_create or regions_to_create or cities_to_create or \
batch = countries_to_update[i:i + batch_size] countries_to_update or regions_to_update or cities_to_update:
for i in tqdm(range(0, len(countries_to_update), batch_size), desc="Updating countries"): self.stdout.write(self.style.WARNING(f'Processing final batch {batch_number} (remaining {len(countries_to_create + countries_to_update)} countries)...'))
batch = countries_to_update[i:i + batch_size] self.stdout.write(f' Countries to create: {len(countries_to_create)}, to update: {len(countries_to_update)}')
Country.objects.bulk_update(batch, ['name', 'subregion', 'capital', 'longitude', 'latitude']) self.stdout.write(f' Regions to create: {len(regions_to_create)}, to update: {len(regions_to_update)}')
self.stdout.write(f' Cities to create: {len(cities_to_create)}, to update: {len(cities_to_update)}')
self._process_batches(
countries_to_create, regions_to_create, cities_to_create,
countries_to_update, regions_to_update, cities_to_update,
batch_size
)
self.stdout.write(self.style.SUCCESS(f'✓ Final batch completed successfully'))
for i in tqdm(range(0, len(regions_to_update), batch_size), desc="Updating regions"): self.stdout.write(self.style.SUCCESS(f'Finished processing {country_count} countries, {total_regions_processed} regions, {total_cities_processed} cities'))
batch = regions_to_update[i:i + batch_size]
Region.objects.bulk_update(batch, ['name', 'country', 'longitude', 'latitude'])
for i in tqdm(range(0, len(cities_to_update), batch_size), desc="Updating cities"): # Clean up obsolete records
batch = cities_to_update[i:i + batch_size] self.stdout.write(self.style.WARNING('Cleaning up obsolete records...'))
City.objects.bulk_update(batch, ['name', 'region', 'longitude', 'latitude']) with transaction.atomic():
countries_deleted = Country.objects.exclude(country_code__in=processed_country_codes).count()
regions_deleted = Region.objects.exclude(id__in=processed_region_ids).count()
cities_deleted = City.objects.exclude(id__in=processed_city_ids).count()
Country.objects.exclude(country_code__in=processed_country_codes).delete() Country.objects.exclude(country_code__in=processed_country_codes).delete()
Region.objects.exclude(id__in=processed_region_ids).delete() Region.objects.exclude(id__in=processed_region_ids).delete()
City.objects.exclude(id__in=processed_city_ids).delete() City.objects.exclude(id__in=processed_city_ids).delete()
if countries_deleted > 0 or regions_deleted > 0 or cities_deleted > 0:
self.stdout.write(f' Deleted {countries_deleted} obsolete countries, {regions_deleted} regions, {cities_deleted} cities')
else:
self.stdout.write(' No obsolete records found to delete')
self.stdout.write(self.style.SUCCESS('All data imported successfully')) self.stdout.write(self.style.SUCCESS('All data imported successfully'))
def _process_batches(self, countries_to_create, regions_to_create, cities_to_create,
countries_to_update, regions_to_update, cities_to_update, batch_size):
"""Process all pending batches in a single transaction"""
with transaction.atomic():
# Create new records
if countries_to_create:
self.stdout.write(f' Creating {len(countries_to_create)} countries in batches of {batch_size}...')
for i in range(0, len(countries_to_create), batch_size):
batch = countries_to_create[i:i + batch_size]
Country.objects.bulk_create(batch, ignore_conflicts=True)
if regions_to_create:
self.stdout.write(f' Creating {len(regions_to_create)} regions in batches of {batch_size}...')
for i in range(0, len(regions_to_create), batch_size):
batch = regions_to_create[i:i + batch_size]
Region.objects.bulk_create(batch, ignore_conflicts=True)
if cities_to_create:
self.stdout.write(f' Creating {len(cities_to_create)} cities in batches of {batch_size}...')
for i in range(0, len(cities_to_create), batch_size):
batch = cities_to_create[i:i + batch_size]
City.objects.bulk_create(batch, ignore_conflicts=True)
# Update existing records
if countries_to_update:
self.stdout.write(f' Updating {len(countries_to_update)} countries in batches of {batch_size}...')
for i in range(0, len(countries_to_update), batch_size):
batch = countries_to_update[i:i + batch_size]
Country.objects.bulk_update(batch, ['name', 'subregion', 'capital', 'longitude', 'latitude'])
if regions_to_update:
self.stdout.write(f' Updating {len(regions_to_update)} regions in batches of {batch_size}...')
for i in range(0, len(regions_to_update), batch_size):
batch = regions_to_update[i:i + batch_size]
Region.objects.bulk_update(batch, ['name', 'country', 'longitude', 'latitude'])
if cities_to_update:
self.stdout.write(f' Updating {len(cities_to_update)} cities in batches of {batch_size}...')
for i in range(0, len(cities_to_update), batch_size):
batch = cities_to_update[i:i + batch_size]
City.objects.bulk_update(batch, ['name', 'region', 'longitude', 'latitude'])