Overview
Harmonize (enrich) your content catalog with Gracenote metadata through batch processing. This kind of application is good for catalog harmonization, data normalization, batch processing, ETL pipelines, scheduled updates. This example shows how to:
Read titles from CSV files or databases
Harmonize each with Gracenote metadata (TMSID, genres, cast, year)
Write harmonized data back to CSV or database
Run as command-line tool or scheduled job
CSV Harmonization
Get a working harmonization (enrichment) script running in 10 minutes.
Prerequisites
Before starting, complete the setup from Connecting to the MCP Server :
Dependencies installed : pip install litellm boto3 mcp python-dotenv
.env configured with your Cognito credentials and at least one LLM API key
mcp_utils.py copied into your project directory
Create Harmonization Script
Create enrich_csv.py. Update the DEFAULT_MODEL value in the script to match your model:
import asyncio
import csv
import json
import os
import re
import sys
from dotenv import load_dotenv
from litellm import completion
from mcp_utils import GracenoteClient
load_dotenv()
DEFAULT_MODEL = "claude-sonnet-4-6"
#DEFAULT_MODEL = "gpt-4o"
#DEFAULT_MODEL = "gemini/gemini-2.5-flash"
SYSTEM_PROMPT = """
You are a metadata enrichment assistant.
Use resolve_entities to find TMSIDs for titles.
You must respond with a single JSON object and nothing else.
Do not include any text, explanation, markdown formatting, or code fences before or after the JSON.
Your entire response must be parseable by json.loads() with no preprocessing.
Required JSON schema:
{"tmsId": "string", "title": "string", "year": number, "genres": ["string"], "cast": ["string"]}
Example of a correct response:
{"tmsId": "MV000722830000", "title": "The Matrix", "year": 1999, "genres": ["Action", "Sci-Fi"], "cast": ["Keanu Reeves"]}
"""
async def enrich_title (title, client, tools, model = DEFAULT_MODEL ):
"""Enrich a single title with Gracenote metadata."""
messages = [
{ "role" : "system" , "content" : SYSTEM_PROMPT },
{ "role" : "user" , "content" : f "Find metadata for: { title } " }
]
response = completion( model = model, messages = messages, tools = tools, max_tokens = 2048 )
response_message = response.choices[ 0 ].message
while response_message.get( "tool_calls" ):
messages.append(response_message.model_dump())
for tool_call in response_message.tool_calls:
tool_args = json.loads(tool_call.function.arguments)
result = await client.call_tool(tool_call.function.name, tool_args)
messages.append({
"role" : "tool" ,
"tool_call_id" : tool_call.id,
"name" : tool_call.function.name,
"content" : str (result.content)
})
response = completion( model = model, messages = messages, tools = tools, max_tokens = 2048 )
response_message = response.choices[ 0 ].message
# Extract JSON response — defensive parsing
# LLMs may wrap JSON in markdown code fences or add prose despite system prompt
text = (response_message.content or "" ).strip()
try :
return json.loads(text)
except json.JSONDecodeError:
pass
# Strip markdown code fencing if present
if "```" in text:
text = text.split( "```" )[ 1 ]
if text.startswith( "json" ):
text = text[ 4 :]
text = text.strip()
try :
return json.loads(text)
except json.JSONDecodeError:
pass
# Extract first JSON object from surrounding prose
match = re.search( r ' \{ [ ^ {}] * (?: \{ [ ^ {}] * \} [ ^ {}] * ) * \} ' , text)
if match:
try :
return json.loads(match.group())
except json.JSONDecodeError:
pass
return { "error" : "Failed to parse" , "title" : title}
async def main (input_file, output_file):
# Read input CSV
with open (input_file, 'r' ) as f:
titles = [row[ 'title' ] for row in csv.DictReader(f)]
print ( f "Enriching {len (titles) } titles..." )
async with GracenoteClient() as client:
tools = await client.tools_for_litellm()
results = []
for i, title in enumerate (titles, 1 ):
print ( f "[ { i } / {len (titles) } ] { title } " )
try :
data = await enrich_title(title, client, tools)
results.append(data)
except Exception as e:
print ( f " Error: { e } " )
results.append({ "error" : str (e), "title" : title})
# Write output CSV
with open (output_file, 'w' , newline = '' ) as f:
fieldnames = [ 'title' , 'tmsId' , 'year' , 'genres' , 'cast' , 'error' ]
writer = csv.DictWriter(f, fieldnames = fieldnames)
writer.writeheader()
for result in results:
writer.writerow({
'title' : result.get( 'title' , '' ),
'tmsId' : result.get( 'tmsId' , '' ),
'year' : result.get( 'year' , '' ),
'genres' : ',' .join(result.get( 'genres' , [])),
'cast' : ',' .join(result.get( 'cast' , [])[: 3 ]),
'error' : result.get( 'error' , '' )
})
print ( f " \n ✓ Done! Results written to { output_file } " )
success = sum ( 1 for r in results if not r.get( 'error' ))
print ( f " Success: { success } / {len (results) } " )
if __name__ == "__main__" :
if len (sys.argv) != 3 :
print ( "Usage: python enrich_csv.py input.csv output.csv" )
sys.exit( 1 )
asyncio.run(main(sys.argv[ 1 ], sys.argv[ 2 ]))
Create Sample Input
Create input.csv:
title
The Matrix (1999)
Breaking Bad
Friends
Inception
The Office
Run It
python3 enrich_csv.py input.csv output.csv
Output:
Enriching 5 titles...
[1/5] The Matrix (1999)
[2/5] Breaking Bad
[3/5] Friends
[4/5] Inception
[5/5] The Office
✓ Done! Results written to output.csv
Success: 5/5
Result (output.csv):
title,tmsId,year,genres,cast,error
The Matrix,MV000123456789,1999,"Action,Sci-Fi","Keanu Reeves,Laurence Fishburne,Carrie-Anne Moss",
Breaking Bad,SH014882360000,2008,"Drama,Crime","Bryan Cranston,Aaron Paul,Anna Gunn",
Friends,SH000279390000,1994,"Comedy,Romance","Jennifer Aniston,Courteney Cox,Lisa Kudrow",
...
Database Integration
Harmonize titles directly in your database:
# db_sync.py
import asyncio
import os
import psycopg2
from enrich_csv import enrich_title
from mcp_utils import GracenoteClient
from dotenv import load_dotenv
load_dotenv()
async def sync_database ():
"""Enrich titles missing TMSIDs in database."""
# Connect to database
conn = psycopg2.connect(
host = os.getenv( "DB_HOST" ),
database = os.getenv( "DB_NAME" ),
user = os.getenv( "DB_USER" ),
password = os.getenv( "DB_PASSWORD" )
)
cur = conn.cursor()
# Get titles without TMSIDs
cur.execute( "SELECT id, title FROM content WHERE tms_id IS NULL LIMIT 100" )
rows = cur.fetchall()
# Connect to MCP
async with GracenoteClient() as client:
for content_id, title in rows:
try :
result = await enrich_title(title, client)
cur.execute( """
UPDATE content
SET tms_id = %s ,
genres = %s ,
year = %s ,
updated_at = NOW()
WHERE id = %s
""" , (
result.get( 'tmsId' ),
result.get( 'genres' ),
result.get( 'year' ),
content_id
))
conn.commit()
print ( f "Updated: { title } " )
except Exception as e:
print ( f "Failed: { title } - { e } " )
cur.close()
conn.close()
if __name__ == "__main__" :
asyncio.run(sync_database())
See also: Search & Discovery | Personalized Recommendations | Best Practices | Tool Reference
Last modified on March 31, 2026