Create a list of Trove's digital periodicals¶
This notebook creates a list of digitised periodicals in Trove by searching for the digital identifier string nla.obj
and limiting the results to periodicals. Before the Trove API introduced the /magazine/titles
endpoint, this was the only way to generate such a list. This method produces slightly different results to the new API endpoint, and it might be useful to compare the two to see what each method misses. Get details of periodicals from the /magazine/titles API endpoint and Enrich the list of periodicals from the Trove API demonstrate how to compile a list of periodicals from the /magazine/titles
endpoint.
The harvesting strategy used in this notebook is similar to that described in the Trove Data Guides' HOW TO: Harvest data relating to digitised resources. Because of variations in the way digitised resources are described and organised, it seems best to harvest all available version records individually, and then merge duplicates at a later step.
The full search query used is "nla.obj" NOT series:"Parliamentary paper (Australia. Parliament)" NOT nuc:"ANL:NED"
. This attempts to exclude Parliamentary Papers and periodicals submitted through the National edeposit scheme.
# Let's import the libraries we need.
import json
import os
import re
from datetime import timedelta
from functools import reduce
from pathlib import Path
import pandas as pd
import requests_cache
from dotenv import load_dotenv
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from tqdm.auto import tqdm
s = requests_cache.CachedSession(expire_after=timedelta(days=30))
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
s.mount("https://", HTTPAdapter(max_retries=retries))
s.mount("http://", HTTPAdapter(max_retries=retries))
load_dotenv()
True
Add your Trove API key¶
You can get a Trove API key by following these instructions.
# Insert your Trove API key
API_KEY = "YOUR API KEY"
# Use api key value from environment variables if it is available
if os.getenv("TROVE_API_KEY"):
API_KEY = os.getenv("TROVE_API_KEY")
Define some functions to do the work¶
def get_total_results(params, headers):
"""
Get the total number of results for a search.
"""
these_params = params.copy()
these_params["n"] = 0
response = s.get(
"https://api.trove.nla.gov.au/v3/result", params=these_params, headers=headers
)
data = response.json()
return int(data["category"][0]["records"]["total"])
def get_value(record, field, keys=["value"]):
"""
Get the values of a field.
Some fields are lists of dicts, if so use the `key` to get the value.
"""
value = record.get(field, [])
if value and isinstance(value[0], dict):
for key in keys:
try:
return [re.sub(r"\s+", " ", v[key]) for v in value]
except KeyError:
pass
else:
return value
def merge_values(record, fields, keys=["value"]):
"""
Merges values from multiple fields, removing any duplicates.
"""
values = []
for field in fields:
values += get_value(record, field, keys)
# Remove duplicates and None value
return list(set([v for v in values if v is not None]))
def flatten_values(record, field, key="type"):
"""
If a field has a value and type, return the values as strings with this format: 'type: value'
"""
flattened = []
values = record.get(field, [])
for value in values:
if key in value:
flattened.append(f"{value[key]}: {value['value']}")
else:
flattened.append(value["value"])
return flattened
def flatten_identifiers(record):
"""
Get a list of control numbers from the identifier field and flatten the values.
"""
ids = {
"identifier": [
v
for v in record.get("identifier", [])
if "type" in v and v["type"] == "control number"
]
}
return flatten_values(ids, "identifier", "source")
def get_fulltext_url(links):
"""
Loop through the identifiers to find a link to the full text version of the book.
"""
urls = []
for link in links:
if (
"linktype" in link
and link["linktype"] == "fulltext"
and "nla.obj" in link["value"]
):
url = re.sub(r"^http\b", "https", link["value"])
link_text = link.get("linktext", "")
urls.append({"url": url, "link_text": link_text})
return urls
def get_catalogue_url(links):
"""
Loop through the identifiers to find a link to the NLA catalogue.
"""
for link in links:
if (
"linktype" in link
and link["linktype"] == "notonline"
and "nla.cat" in link["value"]
):
return link["value"]
return ""
def has_fulltext_link(links):
"""
Check if a list of identifiers includes a fulltext url pointing to an NLA resource.
"""
for link in links:
if (
"linktype" in link
and link["linktype"] == "fulltext"
and "nla.obj" in link["value"]
):
return True
def has_holding(holdings, nucs):
"""
Check if a list of holdings includes one of the supplied nucs.
"""
for holding in holdings:
if holding.get("nuc") in nucs:
return True
def get_digitised_versions(work):
"""
Get the versions from the given work that have a fulltext url pointing to an NLA resource
in the `identifier` field.
"""
versions = []
for version in work["version"]:
if "identifier" in version and has_fulltext_link(version["identifier"]):
versions.append(version)
return versions
def get_nuc_versions(work, nucs=["ANL", "ANL:DL"]):
"""
Get the versions from the given work that are held by the NLA.
"""
versions = []
for version in work["version"]:
if "holding" in version and has_holding(version["holding"], ["ANL", "ANL:DL"]):
versions.append(version)
return versions
def harvest_works(
params,
filter_by="url",
nucs=["ANL", "ANL:DL"],
output_file="harvested-metadata.ndjson",
):
"""
Harvest metadata relating to digitised works.
The filter_by parameter selects records for inclusion in the dataset, options:
* url -- only include versions that have an NLA fulltext url
* nuc -- only include versions that have an NLA nuc (ANL or ANL:DL)
"""
default_params = {
"category": "all",
"bulkHarvest": "true",
"n": 100,
"encoding": "json",
"include": ["links", "workversions", "holdings"],
}
params.update(default_params)
headers = {"X-API-KEY": API_KEY}
total = get_total_results(params, headers)
start = "*"
with Path(output_file).open("w") as ndjson_file:
with tqdm(total=total) as pbar:
while start:
params["s"] = start
response = s.get(
"https://api.trove.nla.gov.au/v3/result",
params=params,
headers=headers,
)
data = response.json()
items = data["category"][0]["records"]["item"]
for item in items:
for category, record in item.items():
if category == "work":
if filter_by == "nuc":
versions = get_nuc_versions(record, nucs)
else:
versions = get_digitised_versions(record)
# Sometimes there are fulltext links on work but not versions
if len(versions) == 0 and has_fulltext_link(
record["identifier"]
):
versions = record["version"]
for version in versions:
for sub_version in version["record"]:
metadata = sub_version["metadata"]["dc"]
# Sometimes fulltext identifiers are only available on the
# version rather than the sub version. So we'll look in the
# sub version first, and if they're not there use the url from
# the version.
# Sometimes there are multiple fulltext urls associated with a version:
# eg a collection page and a publication. If so add records for both urls.
# They could end up pointing to the same digitised publication, but
# we can sort that out later. Aim here is to try and not miss any possible
# routes to digitised publications!
urls = get_fulltext_url(
metadata.get("identifier", [])
)
if len(urls) == 0:
urls = get_fulltext_url(
version.get("identifier", [])
)
# Sometimes there are fulltext links on work but not versions
if len(urls) == 0:
urls = get_fulltext_url(
record.get("identifier", [])
)
if len(urls) == 0 and filter_by == "nuc":
urls = [{"url": "", "link_text": ""}]
for url in urls:
work = {
# This is not the full set of available fields,
# adjust as necessary.
"title": get_value(metadata, "title"),
"work_url": record.get("troveUrl"),
"work_type": record.get("type", []),
"contributor": merge_values(
metadata,
["creator", "contributor"],
["value", "name"],
),
"publisher": get_value(
metadata, "publisher"
),
"date": merge_values(
metadata, ["date", "issued"]
),
# Using merge here because I've noticed some duplicate values
"type": merge_values(metadata, ["type"]),
"format": get_value(metadata, "format"),
"rights": merge_values(
metadata, ["rights", "licenseRef"]
),
"language": get_value(metadata, "language"),
"extent": get_value(metadata, "extent"),
"subject": merge_values(
metadata, ["subject"]
),
"spatial": get_value(metadata, "spatial"),
# Flattened type/value
"is_part_of": flatten_values(
metadata, "isPartOf"
),
# Only get control numbers and flatten
"identifier": flatten_identifiers(metadata),
"fulltext_url": url["url"],
"fulltext_url_text": url["link_text"],
"catalogue_url": get_catalogue_url(
metadata["identifier"]
),
# Could also add in data from bibliographicCitation
# Although the types used in citations seem to vary by work and format.
}
ndjson_file.write(f"{json.dumps(work)}\n")
# The nextStart parameter is used to get the next page of results.
# If there's no nextStart then it means we're on the last page of results.
try:
start = data["category"][0]["records"]["nextStart"]
except KeyError:
start = None
pbar.update(len(items))
Run the harvest¶
params = {
"q": '"nla.obj" NOT series:"Parliamentary paper (Australia. Parliament)" NOT nuc:"ANL:NED"',
"l-format": "Periodical", # Journals only
"l-availability": "y",
}
harvest_works(params)
0%| | 0/1078 [00:00<?, ?it/s]
df = pd.read_json("harvested-metadata.ndjson", lines=True)
df.shape
(1274, 18)
Remove duplicates¶
def merge_column(columns):
values = []
for value in columns:
if isinstance(value, list):
values += [str(v) for v in value if v]
elif value:
values.append(str(value))
return " | ".join(sorted(set(values)))
def merge_records(df):
# df["pages"].fillna(0, inplace=True)
# df.fillna("", inplace=True)
# df["pages"] = df["pages"].astype("Int64")
# Add base dataset with columns that will always have only one value
dfs = [df[["fulltext_url"]].drop_duplicates()]
# Columns that potentially have multiple values which will be merged
columns = [
"title",
"work_url",
"work_type",
"contributor",
"publisher",
"date",
"type",
"format",
"extent",
"language",
"subject",
"spatial",
"is_part_of",
"identifier",
"rights",
"fulltext_url_text",
"catalogue_url",
]
# Merge values from each column in turn, creating a new dataframe from each
for column in columns:
dfs.append(
df.groupby(["fulltext_url"])[column].apply(merge_column).reset_index()
)
# Merge all the individual dataframes into one, linking on `text_file` value
df_merged = reduce(
lambda left, right: pd.merge(left, right, on=["fulltext_url"], how="left"), dfs
)
return df_merged
df_merged = merge_records(df)
# How many journals are there?
df_merged.shape[0]
929
df_merged.to_csv("periodical-works.csv", index=False)
Created by Tim Sherratt for the GLAM Workbench.