mirror of
https://github.com/seanmorley15/AdventureLog.git
synced 2025-07-23 14:59:36 +02:00
Enhance download-countries command with temporary SQLite database for efficient data processing; add batch size argument for improved performance and memory management.
This commit is contained in:
parent
37866660d3
commit
57aa2c9916
2 changed files with 416 additions and 275 deletions
|
@ -23,3 +23,4 @@ tqdm==4.67.1
|
||||||
overpy==0.7
|
overpy==0.7
|
||||||
publicsuffix2==2.20191221
|
publicsuffix2==2.20191221
|
||||||
geopy==2.4.1
|
geopy==2.4.1
|
||||||
|
psutil==6.1.1
|
|
@ -5,6 +5,9 @@ from worldtravel.models import Country, Region, City
|
||||||
from django.db import transaction
|
from django.db import transaction
|
||||||
import ijson
|
import ijson
|
||||||
import gc
|
import gc
|
||||||
|
import tempfile
|
||||||
|
import sqlite3
|
||||||
|
from contextlib import contextmanager
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
|
||||||
|
@ -36,86 +39,113 @@ def saveCountryFlag(country_code):
|
||||||
print(f'Error downloading flag for {country_code}')
|
print(f'Error downloading flag for {country_code}')
|
||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
help = 'Imports the world travel data'
|
help = 'Imports the world travel data with minimal memory usage'
|
||||||
|
|
||||||
def add_arguments(self, parser):
|
def add_arguments(self, parser):
|
||||||
parser.add_argument('--force', action='store_true', help='Force download the countries+regions+states.json file')
|
parser.add_argument('--force', action='store_true', help='Force download the countries+regions+states.json file')
|
||||||
|
parser.add_argument('--batch-size', type=int, default=500, help='Batch size for database operations')
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def _temp_db(self):
|
||||||
|
"""Create a temporary SQLite database for intermediate storage"""
|
||||||
|
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
|
||||||
|
temp_db_path = f.name
|
||||||
|
|
||||||
|
try:
|
||||||
|
conn = sqlite3.connect(temp_db_path)
|
||||||
|
conn.execute('''CREATE TABLE temp_countries (
|
||||||
|
country_code TEXT PRIMARY KEY,
|
||||||
|
name TEXT,
|
||||||
|
subregion TEXT,
|
||||||
|
capital TEXT,
|
||||||
|
longitude REAL,
|
||||||
|
latitude REAL
|
||||||
|
)''')
|
||||||
|
|
||||||
|
conn.execute('''CREATE TABLE temp_regions (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
name TEXT,
|
||||||
|
country_code TEXT,
|
||||||
|
longitude REAL,
|
||||||
|
latitude REAL
|
||||||
|
)''')
|
||||||
|
|
||||||
|
conn.execute('''CREATE TABLE temp_cities (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
name TEXT,
|
||||||
|
region_id TEXT,
|
||||||
|
longitude REAL,
|
||||||
|
latitude REAL
|
||||||
|
)''')
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
yield conn
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
try:
|
||||||
|
os.unlink(temp_db_path)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
def handle(self, **options):
|
def handle(self, **options):
|
||||||
force = options['force']
|
force = options['force']
|
||||||
batch_size = 500 # Larger batch size for better efficiency
|
batch_size = options['batch_size']
|
||||||
countries_json_path = os.path.join(settings.MEDIA_ROOT, f'countries+regions+states-{COUNTRY_REGION_JSON_VERSION}.json')
|
countries_json_path = os.path.join(settings.MEDIA_ROOT, f'countries+regions+states-{COUNTRY_REGION_JSON_VERSION}.json')
|
||||||
|
|
||||||
|
# Download or validate JSON file
|
||||||
if not os.path.exists(countries_json_path) or force:
|
if not os.path.exists(countries_json_path) or force:
|
||||||
|
self.stdout.write('Downloading JSON file...')
|
||||||
res = requests.get(f'https://raw.githubusercontent.com/dr5hn/countries-states-cities-database/{COUNTRY_REGION_JSON_VERSION}/json/countries%2Bstates%2Bcities.json')
|
res = requests.get(f'https://raw.githubusercontent.com/dr5hn/countries-states-cities-database/{COUNTRY_REGION_JSON_VERSION}/json/countries%2Bstates%2Bcities.json')
|
||||||
if res.status_code == 200:
|
if res.status_code == 200:
|
||||||
with open(countries_json_path, 'w') as f:
|
with open(countries_json_path, 'w') as f:
|
||||||
f.write(res.text)
|
f.write(res.text)
|
||||||
self.stdout.write(self.style.SUCCESS('countries+regions+states.json downloaded successfully'))
|
self.stdout.write(self.style.SUCCESS('JSON file downloaded successfully'))
|
||||||
else:
|
else:
|
||||||
self.stdout.write(self.style.ERROR('Error downloading countries+regions+states.json'))
|
self.stdout.write(self.style.ERROR('Error downloading JSON file'))
|
||||||
return
|
return
|
||||||
elif not os.path.isfile(countries_json_path):
|
elif not os.path.isfile(countries_json_path):
|
||||||
self.stdout.write(self.style.ERROR('countries+regions+states.json is not a file'))
|
self.stdout.write(self.style.ERROR('JSON file is not a file'))
|
||||||
return
|
return
|
||||||
elif os.path.getsize(countries_json_path) == 0:
|
elif os.path.getsize(countries_json_path) == 0:
|
||||||
self.stdout.write(self.style.ERROR('countries+regions+states.json is empty'))
|
self.stdout.write(self.style.ERROR('JSON file is empty'))
|
||||||
return
|
return
|
||||||
elif Country.objects.count() == 0 or Region.objects.count() == 0 or City.objects.count() == 0:
|
elif Country.objects.count() == 0 or Region.objects.count() == 0 or City.objects.count() == 0:
|
||||||
self.stdout.write(self.style.WARNING('Some region data is missing. Re-importing all data.'))
|
self.stdout.write(self.style.WARNING('Some data is missing. Re-importing all data.'))
|
||||||
else:
|
else:
|
||||||
self.stdout.write(self.style.SUCCESS('Latest country, region, and state data already downloaded.'))
|
self.stdout.write(self.style.SUCCESS('Latest data already imported.'))
|
||||||
return
|
return
|
||||||
|
|
||||||
self.stdout.write(self.style.SUCCESS('Starting memory-efficient import process...'))
|
self.stdout.write(self.style.SUCCESS('Starting ultra-memory-efficient import process...'))
|
||||||
|
|
||||||
# Pass 1: Process countries only
|
# Use temporary SQLite database for intermediate storage
|
||||||
self.stdout.write(self.style.WARNING('Pass 1: Processing countries...'))
|
with self._temp_db() as temp_conn:
|
||||||
processed_country_codes = self._process_countries_pass(countries_json_path, batch_size)
|
self.stdout.write('Step 1: Parsing JSON and storing in temporary database...')
|
||||||
|
self._parse_and_store_temp(countries_json_path, temp_conn)
|
||||||
|
|
||||||
# Pass 2: Process regions only
|
self.stdout.write('Step 2: Processing countries...')
|
||||||
self.stdout.write(self.style.WARNING('Pass 2: Processing regions...'))
|
self._process_countries_from_temp(temp_conn, batch_size)
|
||||||
processed_region_ids = self._process_regions_pass(countries_json_path, batch_size)
|
|
||||||
|
|
||||||
# Pass 3: Process cities only
|
self.stdout.write('Step 3: Processing regions...')
|
||||||
self.stdout.write(self.style.WARNING('Pass 3: Processing cities...'))
|
self._process_regions_from_temp(temp_conn, batch_size)
|
||||||
processed_city_ids = self._process_cities_pass(countries_json_path, batch_size)
|
|
||||||
|
|
||||||
# Clean up obsolete records
|
self.stdout.write('Step 4: Processing cities...')
|
||||||
self.stdout.write(self.style.WARNING('Pass 4: Cleaning up obsolete records...'))
|
self._process_cities_from_temp(temp_conn, batch_size)
|
||||||
with transaction.atomic():
|
|
||||||
countries_deleted = Country.objects.exclude(country_code__in=processed_country_codes).count()
|
|
||||||
regions_deleted = Region.objects.exclude(id__in=processed_region_ids).count()
|
|
||||||
cities_deleted = City.objects.exclude(id__in=processed_city_ids).count()
|
|
||||||
|
|
||||||
Country.objects.exclude(country_code__in=processed_country_codes).delete()
|
self.stdout.write('Step 5: Cleaning up obsolete records...')
|
||||||
Region.objects.exclude(id__in=processed_region_ids).delete()
|
self._cleanup_obsolete_records(temp_conn)
|
||||||
City.objects.exclude(id__in=processed_city_ids).delete()
|
|
||||||
|
|
||||||
if countries_deleted > 0 or regions_deleted > 0 or cities_deleted > 0:
|
self.stdout.write(self.style.SUCCESS('All data imported successfully with minimal memory usage'))
|
||||||
self.stdout.write(f' Deleted {countries_deleted} obsolete countries, {regions_deleted} regions, {cities_deleted} cities')
|
|
||||||
else:
|
|
||||||
self.stdout.write(' No obsolete records found to delete')
|
|
||||||
|
|
||||||
self.stdout.write(self.style.SUCCESS('All data imported successfully'))
|
def _parse_and_store_temp(self, json_path, temp_conn):
|
||||||
|
"""Parse JSON once and store in temporary SQLite database"""
|
||||||
def _process_countries_pass(self, json_path, batch_size):
|
|
||||||
"""First pass: Process only countries"""
|
|
||||||
self.stdout.write(' Loading existing countries...')
|
|
||||||
existing_countries = {c.country_code: c for c in Country.objects.all()}
|
|
||||||
self.stdout.write(f' Found {len(existing_countries)} existing countries')
|
|
||||||
|
|
||||||
processed_country_codes = set()
|
|
||||||
countries_to_create = []
|
|
||||||
countries_to_update = []
|
|
||||||
country_count = 0
|
country_count = 0
|
||||||
batches_processed = 0
|
region_count = 0
|
||||||
|
city_count = 0
|
||||||
|
|
||||||
with open(json_path, 'rb') as f:
|
with open(json_path, 'rb') as f:
|
||||||
parser = ijson.items(f, 'item')
|
parser = ijson.items(f, 'item')
|
||||||
|
|
||||||
for country in parser:
|
for country in parser:
|
||||||
country_count += 1
|
|
||||||
country_code = country['iso2']
|
country_code = country['iso2']
|
||||||
country_name = country['name']
|
country_name = country['name']
|
||||||
country_subregion = country['subregion']
|
country_subregion = country['subregion']
|
||||||
|
@ -123,255 +153,365 @@ class Command(BaseCommand):
|
||||||
longitude = round(float(country['longitude']), 6) if country['longitude'] else None
|
longitude = round(float(country['longitude']), 6) if country['longitude'] else None
|
||||||
latitude = round(float(country['latitude']), 6) if country['latitude'] else None
|
latitude = round(float(country['latitude']), 6) if country['latitude'] else None
|
||||||
|
|
||||||
processed_country_codes.add(country_code)
|
# Store country
|
||||||
|
temp_conn.execute('''INSERT OR REPLACE INTO temp_countries
|
||||||
|
(country_code, name, subregion, capital, longitude, latitude)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?)''',
|
||||||
|
(country_code, country_name, country_subregion, country_capital, longitude, latitude))
|
||||||
|
|
||||||
|
country_count += 1
|
||||||
|
|
||||||
|
# Download flag (do this during parsing to avoid extra pass)
|
||||||
|
saveCountryFlag(country_code)
|
||||||
|
|
||||||
|
# Process regions/states
|
||||||
|
if country['states']:
|
||||||
|
for state in country['states']:
|
||||||
|
state_id = f"{country_code}-{state['state_code']}"
|
||||||
|
state_name = state['name']
|
||||||
|
state_lat = round(float(state['latitude']), 6) if state['latitude'] else None
|
||||||
|
state_lng = round(float(state['longitude']), 6) if state['longitude'] else None
|
||||||
|
|
||||||
|
temp_conn.execute('''INSERT OR REPLACE INTO temp_regions
|
||||||
|
(id, name, country_code, longitude, latitude)
|
||||||
|
VALUES (?, ?, ?, ?, ?)''',
|
||||||
|
(state_id, state_name, country_code, state_lng, state_lat))
|
||||||
|
|
||||||
|
region_count += 1
|
||||||
|
|
||||||
|
# Process cities
|
||||||
|
if 'cities' in state and state['cities']:
|
||||||
|
for city in state['cities']:
|
||||||
|
city_id = f"{state_id}-{city['id']}"
|
||||||
|
city_name = city['name']
|
||||||
|
city_lat = round(float(city['latitude']), 6) if city['latitude'] else None
|
||||||
|
city_lng = round(float(city['longitude']), 6) if city['longitude'] else None
|
||||||
|
|
||||||
|
temp_conn.execute('''INSERT OR REPLACE INTO temp_cities
|
||||||
|
(id, name, region_id, longitude, latitude)
|
||||||
|
VALUES (?, ?, ?, ?, ?)''',
|
||||||
|
(city_id, city_name, state_id, city_lng, city_lat))
|
||||||
|
|
||||||
|
city_count += 1
|
||||||
|
else:
|
||||||
|
# Country without states - create default region
|
||||||
|
state_id = f"{country_code}-00"
|
||||||
|
temp_conn.execute('''INSERT OR REPLACE INTO temp_regions
|
||||||
|
(id, name, country_code, longitude, latitude)
|
||||||
|
VALUES (?, ?, ?, ?, ?)''',
|
||||||
|
(state_id, country_name, country_code, None, None))
|
||||||
|
region_count += 1
|
||||||
|
|
||||||
|
# Commit periodically to avoid memory buildup
|
||||||
|
if country_count % 100 == 0:
|
||||||
|
temp_conn.commit()
|
||||||
|
self.stdout.write(f' Parsed {country_count} countries, {region_count} regions, {city_count} cities...')
|
||||||
|
|
||||||
|
temp_conn.commit()
|
||||||
|
self.stdout.write(f'✓ Parsing complete: {country_count} countries, {region_count} regions, {city_count} cities')
|
||||||
|
|
||||||
|
def _process_countries_from_temp(self, temp_conn, batch_size):
|
||||||
|
"""Process countries from temporary database"""
|
||||||
|
cursor = temp_conn.execute('SELECT country_code, name, subregion, capital, longitude, latitude FROM temp_countries')
|
||||||
|
|
||||||
|
countries_to_create = []
|
||||||
|
countries_to_update = []
|
||||||
|
processed = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
rows = cursor.fetchmany(batch_size)
|
||||||
|
if not rows:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Batch check for existing countries
|
||||||
|
country_codes_in_batch = [row[0] for row in rows]
|
||||||
|
existing_countries = {
|
||||||
|
c.country_code: c for c in
|
||||||
|
Country.objects.filter(country_code__in=country_codes_in_batch)
|
||||||
|
.only('country_code', 'name', 'subregion', 'capital', 'longitude', 'latitude')
|
||||||
|
}
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
country_code, name, subregion, capital, longitude, latitude = row
|
||||||
|
|
||||||
if country_code in existing_countries:
|
if country_code in existing_countries:
|
||||||
|
# Update existing
|
||||||
country_obj = existing_countries[country_code]
|
country_obj = existing_countries[country_code]
|
||||||
country_obj.name = country_name
|
country_obj.name = name
|
||||||
country_obj.subregion = country_subregion
|
country_obj.subregion = subregion
|
||||||
country_obj.capital = country_capital
|
country_obj.capital = capital
|
||||||
country_obj.longitude = longitude
|
country_obj.longitude = longitude
|
||||||
country_obj.latitude = latitude
|
country_obj.latitude = latitude
|
||||||
countries_to_update.append(country_obj)
|
countries_to_update.append(country_obj)
|
||||||
else:
|
else:
|
||||||
country_obj = Country(
|
countries_to_create.append(Country(
|
||||||
name=country_name,
|
|
||||||
country_code=country_code,
|
country_code=country_code,
|
||||||
subregion=country_subregion,
|
name=name,
|
||||||
capital=country_capital,
|
subregion=subregion,
|
||||||
|
capital=capital,
|
||||||
longitude=longitude,
|
longitude=longitude,
|
||||||
latitude=latitude
|
latitude=latitude
|
||||||
)
|
))
|
||||||
countries_to_create.append(country_obj)
|
|
||||||
|
|
||||||
# Download flag
|
processed += 1
|
||||||
saveCountryFlag(country_code)
|
|
||||||
|
|
||||||
# Process in batches to limit memory usage
|
# Flush batches
|
||||||
if len(countries_to_create) >= batch_size or len(countries_to_update) >= batch_size:
|
if countries_to_create:
|
||||||
batches_processed += 1
|
with transaction.atomic():
|
||||||
self.stdout.write(f' Saving batch {batches_processed} ({len(countries_to_create)} new, {len(countries_to_update)} updated)')
|
Country.objects.bulk_create(countries_to_create, batch_size=batch_size, ignore_conflicts=True)
|
||||||
self._flush_countries_batch(countries_to_create, countries_to_update, batch_size)
|
|
||||||
countries_to_create.clear()
|
countries_to_create.clear()
|
||||||
|
|
||||||
|
if countries_to_update:
|
||||||
|
with transaction.atomic():
|
||||||
|
Country.objects.bulk_update(
|
||||||
|
countries_to_update,
|
||||||
|
['name', 'subregion', 'capital', 'longitude', 'latitude'],
|
||||||
|
batch_size=batch_size
|
||||||
|
)
|
||||||
countries_to_update.clear()
|
countries_to_update.clear()
|
||||||
|
|
||||||
|
if processed % 1000 == 0:
|
||||||
|
self.stdout.write(f' Processed {processed} countries...')
|
||||||
gc.collect()
|
gc.collect()
|
||||||
|
|
||||||
if country_count % 50 == 0:
|
# Final flush
|
||||||
self.stdout.write(f' Processed {country_count} countries...')
|
if countries_to_create:
|
||||||
|
with transaction.atomic():
|
||||||
|
Country.objects.bulk_create(countries_to_create, batch_size=batch_size, ignore_conflicts=True)
|
||||||
|
if countries_to_update:
|
||||||
|
with transaction.atomic():
|
||||||
|
Country.objects.bulk_update(
|
||||||
|
countries_to_update,
|
||||||
|
['name', 'subregion', 'capital', 'longitude', 'latitude'],
|
||||||
|
batch_size=batch_size
|
||||||
|
)
|
||||||
|
|
||||||
# Process remaining countries
|
self.stdout.write(f'✓ Countries complete: {processed} processed')
|
||||||
if countries_to_create or countries_to_update:
|
|
||||||
batches_processed += 1
|
|
||||||
self.stdout.write(f' Saving final batch ({len(countries_to_create)} new, {len(countries_to_update)} updated)')
|
|
||||||
self._flush_countries_batch(countries_to_create, countries_to_update, batch_size)
|
|
||||||
|
|
||||||
self.stdout.write(self.style.SUCCESS(f' ✓ Completed: {country_count} countries processed in {batches_processed} batches'))
|
def _process_regions_from_temp(self, temp_conn, batch_size):
|
||||||
return processed_country_codes
|
"""Process regions from temporary database"""
|
||||||
|
# Get country mapping once
|
||||||
|
country_map = {c.country_code: c for c in Country.objects.only('id', 'country_code')}
|
||||||
|
|
||||||
def _process_regions_pass(self, json_path, batch_size):
|
cursor = temp_conn.execute('SELECT id, name, country_code, longitude, latitude FROM temp_regions')
|
||||||
"""Second pass: Process only regions"""
|
|
||||||
self.stdout.write(' Loading countries and existing regions...')
|
|
||||||
existing_regions = {r.id: r for r in Region.objects.all()}
|
|
||||||
countries_dict = {c.country_code: c for c in Country.objects.all()}
|
|
||||||
self.stdout.write(f' Found {len(existing_regions)} existing regions, {len(countries_dict)} countries')
|
|
||||||
|
|
||||||
processed_region_ids = set()
|
|
||||||
regions_to_create = []
|
regions_to_create = []
|
||||||
regions_to_update = []
|
regions_to_update = []
|
||||||
region_count = 0
|
processed = 0
|
||||||
batches_processed = 0
|
|
||||||
|
|
||||||
with open(json_path, 'rb') as f:
|
while True:
|
||||||
parser = ijson.items(f, 'item')
|
rows = cursor.fetchmany(batch_size)
|
||||||
|
if not rows:
|
||||||
|
break
|
||||||
|
|
||||||
for country in parser:
|
# Batch check for existing regions
|
||||||
country_code = country['iso2']
|
region_ids_in_batch = [row[0] for row in rows]
|
||||||
country_name = country['name']
|
existing_regions = {
|
||||||
country_obj = countries_dict[country_code]
|
r.id: r for r in
|
||||||
|
Region.objects.filter(id__in=region_ids_in_batch)
|
||||||
|
.select_related('country')
|
||||||
|
.only('id', 'name', 'country', 'longitude', 'latitude')
|
||||||
|
}
|
||||||
|
|
||||||
if country['states']:
|
for row in rows:
|
||||||
for state in country['states']:
|
region_id, name, country_code, longitude, latitude = row
|
||||||
name = state['name']
|
country_obj = country_map.get(country_code)
|
||||||
state_id = f"{country_code}-{state['state_code']}"
|
|
||||||
latitude = round(float(state['latitude']), 6) if state['latitude'] else None
|
|
||||||
longitude = round(float(state['longitude']), 6) if state['longitude'] else None
|
|
||||||
|
|
||||||
if state_id in processed_region_ids:
|
if not country_obj:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
processed_region_ids.add(state_id)
|
if region_id in existing_regions:
|
||||||
region_count += 1
|
# Update existing
|
||||||
|
region_obj = existing_regions[region_id]
|
||||||
if state_id in existing_regions:
|
|
||||||
region_obj = existing_regions[state_id]
|
|
||||||
region_obj.name = name
|
region_obj.name = name
|
||||||
region_obj.country = country_obj
|
region_obj.country = country_obj
|
||||||
region_obj.longitude = longitude
|
region_obj.longitude = longitude
|
||||||
region_obj.latitude = latitude
|
region_obj.latitude = latitude
|
||||||
regions_to_update.append(region_obj)
|
regions_to_update.append(region_obj)
|
||||||
else:
|
else:
|
||||||
region_obj = Region(
|
regions_to_create.append(Region(
|
||||||
id=state_id,
|
id=region_id,
|
||||||
name=name,
|
name=name,
|
||||||
country=country_obj,
|
country=country_obj,
|
||||||
longitude=longitude,
|
longitude=longitude,
|
||||||
latitude=latitude
|
latitude=latitude
|
||||||
)
|
))
|
||||||
regions_to_create.append(region_obj)
|
|
||||||
|
|
||||||
# Process in batches
|
processed += 1
|
||||||
if len(regions_to_create) >= batch_size or len(regions_to_update) >= batch_size:
|
|
||||||
batches_processed += 1
|
# Flush batches
|
||||||
self.stdout.write(f' Saving batch {batches_processed} ({len(regions_to_create)} new, {len(regions_to_update)} updated)')
|
if regions_to_create:
|
||||||
self._flush_regions_batch(regions_to_create, regions_to_update, batch_size)
|
with transaction.atomic():
|
||||||
|
Region.objects.bulk_create(regions_to_create, batch_size=batch_size, ignore_conflicts=True)
|
||||||
regions_to_create.clear()
|
regions_to_create.clear()
|
||||||
regions_to_update.clear()
|
|
||||||
gc.collect()
|
|
||||||
else:
|
|
||||||
# Country without states - create a default region
|
|
||||||
state_id = f"{country_code}-00"
|
|
||||||
if state_id not in processed_region_ids:
|
|
||||||
processed_region_ids.add(state_id)
|
|
||||||
region_count += 1
|
|
||||||
|
|
||||||
if state_id in existing_regions:
|
if regions_to_update:
|
||||||
region_obj = existing_regions[state_id]
|
with transaction.atomic():
|
||||||
region_obj.name = country_name
|
Region.objects.bulk_update(
|
||||||
region_obj.country = country_obj
|
regions_to_update,
|
||||||
regions_to_update.append(region_obj)
|
['name', 'country', 'longitude', 'latitude'],
|
||||||
else:
|
batch_size=batch_size
|
||||||
region_obj = Region(
|
|
||||||
id=state_id,
|
|
||||||
name=country_name,
|
|
||||||
country=country_obj
|
|
||||||
)
|
)
|
||||||
regions_to_create.append(region_obj)
|
regions_to_update.clear()
|
||||||
|
|
||||||
if region_count % 2000 == 0 and region_count > 0:
|
if processed % 2000 == 0:
|
||||||
self.stdout.write(f' Processed {region_count} regions...')
|
self.stdout.write(f' Processed {processed} regions...')
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
# Process remaining regions
|
# Final flush
|
||||||
if regions_to_create or regions_to_update:
|
if regions_to_create:
|
||||||
batches_processed += 1
|
with transaction.atomic():
|
||||||
self.stdout.write(f' Saving final batch ({len(regions_to_create)} new, {len(regions_to_update)} updated)')
|
Region.objects.bulk_create(regions_to_create, batch_size=batch_size, ignore_conflicts=True)
|
||||||
self._flush_regions_batch(regions_to_create, regions_to_update, batch_size)
|
if regions_to_update:
|
||||||
|
with transaction.atomic():
|
||||||
|
Region.objects.bulk_update(
|
||||||
|
regions_to_update,
|
||||||
|
['name', 'country', 'longitude', 'latitude'],
|
||||||
|
batch_size=batch_size
|
||||||
|
)
|
||||||
|
|
||||||
self.stdout.write(self.style.SUCCESS(f' ✓ Completed: {region_count} regions processed in {batches_processed} batches'))
|
self.stdout.write(f'✓ Regions complete: {processed} processed')
|
||||||
return processed_region_ids
|
|
||||||
|
|
||||||
def _process_cities_pass(self, json_path, batch_size):
|
def _process_cities_from_temp(self, temp_conn, batch_size):
|
||||||
"""Third pass: Process only cities"""
|
"""Process cities from temporary database with optimized existence checking"""
|
||||||
self.stdout.write(' Loading regions and existing cities...')
|
# Get region mapping once
|
||||||
existing_cities = {c.id: c for c in City.objects.all()}
|
region_map = {r.id: r for r in Region.objects.only('id')}
|
||||||
regions_dict = {r.id: r for r in Region.objects.all()}
|
|
||||||
self.stdout.write(f' Found {len(existing_cities)} existing cities, {len(regions_dict)} regions')
|
cursor = temp_conn.execute('SELECT id, name, region_id, longitude, latitude FROM temp_cities')
|
||||||
|
|
||||||
processed_city_ids = set()
|
|
||||||
cities_to_create = []
|
cities_to_create = []
|
||||||
cities_to_update = []
|
cities_to_update = []
|
||||||
city_count = 0
|
processed = 0
|
||||||
batches_processed = 0
|
|
||||||
|
|
||||||
with open(json_path, 'rb') as f:
|
while True:
|
||||||
parser = ijson.items(f, 'item')
|
rows = cursor.fetchmany(batch_size)
|
||||||
|
if not rows:
|
||||||
|
break
|
||||||
|
|
||||||
for country in parser:
|
# Fast existence check - only get IDs, no objects
|
||||||
country_code = country['iso2']
|
city_ids_in_batch = [row[0] for row in rows]
|
||||||
|
existing_city_ids = set(
|
||||||
|
City.objects.filter(id__in=city_ids_in_batch)
|
||||||
|
.values_list('id', flat=True)
|
||||||
|
)
|
||||||
|
|
||||||
if country['states']:
|
for row in rows:
|
||||||
for state in country['states']:
|
city_id, name, region_id, longitude, latitude = row
|
||||||
state_id = f"{country_code}-{state['state_code']}"
|
region_obj = region_map.get(region_id)
|
||||||
region_obj = regions_dict.get(state_id)
|
|
||||||
|
|
||||||
if not region_obj:
|
if not region_obj:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if 'cities' in state and len(state['cities']) > 0:
|
if city_id in existing_city_ids:
|
||||||
for city in state['cities']:
|
# For updates, just store the data - we'll do bulk update by raw SQL
|
||||||
city_id = f"{state_id}-{city['id']}"
|
cities_to_update.append({
|
||||||
city_name = city['name']
|
'id': city_id,
|
||||||
latitude = round(float(city['latitude']), 6) if city['latitude'] else None
|
'name': name,
|
||||||
longitude = round(float(city['longitude']), 6) if city['longitude'] else None
|
'region_id': region_obj.id,
|
||||||
|
'longitude': longitude,
|
||||||
if city_id in processed_city_ids:
|
'latitude': latitude
|
||||||
continue
|
})
|
||||||
|
|
||||||
processed_city_ids.add(city_id)
|
|
||||||
city_count += 1
|
|
||||||
|
|
||||||
if city_id in existing_cities:
|
|
||||||
city_obj = existing_cities[city_id]
|
|
||||||
city_obj.name = city_name
|
|
||||||
city_obj.region = region_obj
|
|
||||||
city_obj.longitude = longitude
|
|
||||||
city_obj.latitude = latitude
|
|
||||||
cities_to_update.append(city_obj)
|
|
||||||
else:
|
else:
|
||||||
city_obj = City(
|
cities_to_create.append(City(
|
||||||
id=city_id,
|
id=city_id,
|
||||||
name=city_name,
|
name=name,
|
||||||
region=region_obj,
|
region=region_obj,
|
||||||
longitude=longitude,
|
longitude=longitude,
|
||||||
latitude=latitude
|
latitude=latitude
|
||||||
)
|
))
|
||||||
cities_to_create.append(city_obj)
|
|
||||||
|
|
||||||
# Process in batches
|
processed += 1
|
||||||
if len(cities_to_create) >= batch_size or len(cities_to_update) >= batch_size:
|
|
||||||
batches_processed += 1
|
# Flush create batch (this is already fast)
|
||||||
self.stdout.write(f' Saving batch {batches_processed} ({len(cities_to_create)} new, {len(cities_to_update)} updated)')
|
if cities_to_create:
|
||||||
self._flush_cities_batch(cities_to_create, cities_to_update, batch_size)
|
with transaction.atomic():
|
||||||
|
City.objects.bulk_create(cities_to_create, batch_size=batch_size, ignore_conflicts=True)
|
||||||
cities_to_create.clear()
|
cities_to_create.clear()
|
||||||
|
|
||||||
|
# Flush update batch with raw SQL for speed
|
||||||
|
if cities_to_update:
|
||||||
|
self._bulk_update_cities_raw(cities_to_update)
|
||||||
cities_to_update.clear()
|
cities_to_update.clear()
|
||||||
|
|
||||||
|
if processed % 5000 == 0:
|
||||||
|
self.stdout.write(f' Processed {processed} cities...')
|
||||||
gc.collect()
|
gc.collect()
|
||||||
|
|
||||||
if city_count % 10000 == 0 and city_count > 0:
|
# Final flush
|
||||||
self.stdout.write(f' Processed {city_count} cities...')
|
|
||||||
|
|
||||||
# Process remaining cities
|
|
||||||
if cities_to_create or cities_to_update:
|
|
||||||
batches_processed += 1
|
|
||||||
self.stdout.write(f' Saving final batch ({len(cities_to_create)} new, {len(cities_to_update)} updated)')
|
|
||||||
self._flush_cities_batch(cities_to_create, cities_to_update, batch_size)
|
|
||||||
|
|
||||||
self.stdout.write(self.style.SUCCESS(f' ✓ Completed: {city_count} cities processed in {batches_processed} batches'))
|
|
||||||
return processed_city_ids
|
|
||||||
|
|
||||||
def _flush_countries_batch(self, countries_to_create, countries_to_update, batch_size):
|
|
||||||
"""Flush countries batch to database"""
|
|
||||||
with transaction.atomic():
|
|
||||||
if countries_to_create:
|
|
||||||
for i in range(0, len(countries_to_create), batch_size):
|
|
||||||
batch = countries_to_create[i:i + batch_size]
|
|
||||||
Country.objects.bulk_create(batch, ignore_conflicts=True)
|
|
||||||
|
|
||||||
if countries_to_update:
|
|
||||||
for i in range(0, len(countries_to_update), batch_size):
|
|
||||||
batch = countries_to_update[i:i + batch_size]
|
|
||||||
Country.objects.bulk_update(batch, ['name', 'subregion', 'capital', 'longitude', 'latitude'])
|
|
||||||
|
|
||||||
def _flush_regions_batch(self, regions_to_create, regions_to_update, batch_size):
|
|
||||||
"""Flush regions batch to database"""
|
|
||||||
with transaction.atomic():
|
|
||||||
if regions_to_create:
|
|
||||||
for i in range(0, len(regions_to_create), batch_size):
|
|
||||||
batch = regions_to_create[i:i + batch_size]
|
|
||||||
Region.objects.bulk_create(batch, ignore_conflicts=True)
|
|
||||||
|
|
||||||
if regions_to_update:
|
|
||||||
for i in range(0, len(regions_to_update), batch_size):
|
|
||||||
batch = regions_to_update[i:i + batch_size]
|
|
||||||
Region.objects.bulk_update(batch, ['name', 'country', 'longitude', 'latitude'])
|
|
||||||
|
|
||||||
def _flush_cities_batch(self, cities_to_create, cities_to_update, batch_size):
|
|
||||||
"""Flush cities batch to database"""
|
|
||||||
with transaction.atomic():
|
|
||||||
if cities_to_create:
|
if cities_to_create:
|
||||||
for i in range(0, len(cities_to_create), batch_size):
|
with transaction.atomic():
|
||||||
batch = cities_to_create[i:i + batch_size]
|
City.objects.bulk_create(cities_to_create, batch_size=batch_size, ignore_conflicts=True)
|
||||||
City.objects.bulk_create(batch, ignore_conflicts=True)
|
|
||||||
|
|
||||||
if cities_to_update:
|
if cities_to_update:
|
||||||
for i in range(0, len(cities_to_update), batch_size):
|
self._bulk_update_cities_raw(cities_to_update)
|
||||||
batch = cities_to_update[i:i + batch_size]
|
|
||||||
City.objects.bulk_update(batch, ['name', 'region', 'longitude', 'latitude'])
|
self.stdout.write(f'✓ Cities complete: {processed} processed')
|
||||||
|
|
||||||
|
def _bulk_update_cities_raw(self, cities_data):
|
||||||
|
"""Fast bulk update using raw SQL"""
|
||||||
|
if not cities_data:
|
||||||
|
return
|
||||||
|
|
||||||
|
from django.db import connection
|
||||||
|
|
||||||
|
with connection.cursor() as cursor:
|
||||||
|
# Build the SQL for bulk update
|
||||||
|
# Using CASE statements for efficient bulk updates
|
||||||
|
when_clauses_name = []
|
||||||
|
when_clauses_region = []
|
||||||
|
when_clauses_lng = []
|
||||||
|
when_clauses_lat = []
|
||||||
|
city_ids = []
|
||||||
|
|
||||||
|
for city in cities_data:
|
||||||
|
city_id = city['id']
|
||||||
|
city_ids.append(city_id)
|
||||||
|
when_clauses_name.append(f"WHEN id = %s THEN %s")
|
||||||
|
when_clauses_region.append(f"WHEN id = %s THEN %s")
|
||||||
|
when_clauses_lng.append(f"WHEN id = %s THEN %s")
|
||||||
|
when_clauses_lat.append(f"WHEN id = %s THEN %s")
|
||||||
|
|
||||||
|
# Build parameters list
|
||||||
|
params = []
|
||||||
|
for city in cities_data:
|
||||||
|
params.extend([city['id'], city['name']]) # for name
|
||||||
|
for city in cities_data:
|
||||||
|
params.extend([city['id'], city['region_id']]) # for region_id
|
||||||
|
for city in cities_data:
|
||||||
|
params.extend([city['id'], city['longitude']]) # for longitude
|
||||||
|
for city in cities_data:
|
||||||
|
params.extend([city['id'], city['latitude']]) # for latitude
|
||||||
|
params.extend(city_ids) # for WHERE clause
|
||||||
|
|
||||||
|
# Execute the bulk update
|
||||||
|
sql = f"""
|
||||||
|
UPDATE worldtravel_city
|
||||||
|
SET
|
||||||
|
name = CASE {' '.join(when_clauses_name)} END,
|
||||||
|
region_id = CASE {' '.join(when_clauses_region)} END,
|
||||||
|
longitude = CASE {' '.join(when_clauses_lng)} END,
|
||||||
|
latitude = CASE {' '.join(when_clauses_lat)} END
|
||||||
|
WHERE id IN ({','.join(['%s'] * len(city_ids))})
|
||||||
|
"""
|
||||||
|
|
||||||
|
cursor.execute(sql, params)
|
||||||
|
|
||||||
|
def _cleanup_obsolete_records(self, temp_conn):
|
||||||
|
"""Clean up obsolete records using temporary database"""
|
||||||
|
# Get IDs from temp database to avoid loading large lists into memory
|
||||||
|
temp_country_codes = {row[0] for row in temp_conn.execute('SELECT country_code FROM temp_countries')}
|
||||||
|
temp_region_ids = {row[0] for row in temp_conn.execute('SELECT id FROM temp_regions')}
|
||||||
|
temp_city_ids = {row[0] for row in temp_conn.execute('SELECT id FROM temp_cities')}
|
||||||
|
|
||||||
|
with transaction.atomic():
|
||||||
|
countries_deleted = Country.objects.exclude(country_code__in=temp_country_codes).count()
|
||||||
|
regions_deleted = Region.objects.exclude(id__in=temp_region_ids).count()
|
||||||
|
cities_deleted = City.objects.exclude(id__in=temp_city_ids).count()
|
||||||
|
|
||||||
|
Country.objects.exclude(country_code__in=temp_country_codes).delete()
|
||||||
|
Region.objects.exclude(id__in=temp_region_ids).delete()
|
||||||
|
City.objects.exclude(id__in=temp_city_ids).delete()
|
||||||
|
|
||||||
|
if countries_deleted > 0 or regions_deleted > 0 or cities_deleted > 0:
|
||||||
|
self.stdout.write(f'✓ Deleted {countries_deleted} obsolete countries, {regions_deleted} regions, {cities_deleted} cities')
|
||||||
|
else:
|
||||||
|
self.stdout.write('✓ No obsolete records found to delete')
|
Loading…
Add table
Add a link
Reference in a new issue