"""Core document model and base loader for snowloader.
This module defines the two building blocks that everything else in snowloader
depends on:
SnowDocument: A framework-agnostic container for document content and metadata.
Intentionally kept simple so it can be converted to LangChain Documents,
LlamaIndex Documents, or any other format with minimal fuss.
BaseSnowLoader: Abstract base class for all table-specific loaders. Provides
the shared machinery for fetching records through SnowConnection, converting
them to SnowDocuments, handling delta sync via load_since(), and pulling
journal entries (work notes / comments) from sys_journal_field.
Subclasses only need to set a couple of class attributes (table name, content
fields) and optionally override _record_to_document() if they need custom
content assembly logic.
Author: Roni Das
"""
from __future__ import annotations
import logging
from collections.abc import Generator
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from snowloader.connection import SnowConnection, SnowConnectionError
logger = logging.getLogger(__name__)
[docs]
@dataclass
class SnowDocument:
"""A single document extracted from a ServiceNow table.
This is the intermediate format that lives between the raw API response
and whatever the framework adapters produce. Every loader yields these,
and every adapter consumes them.
Attributes:
page_content: The main text content of the document. How this is
assembled depends on the specific loader (could be a short
description, a KB article body, a concatenation of fields, etc).
metadata: Key-value pairs describing where this document came from.
Typically includes sys_id, number, table name, and any other
fields the loader considers useful for retrieval or filtering.
"""
page_content: str
metadata: dict[str, Any] = field(default_factory=dict)
[docs]
class BaseSnowLoader:
"""Shared foundation for all ServiceNow table loaders.
Subclasses must define:
table: The ServiceNow table name to query (e.g. "incident").
content_fields: List of field names whose values get concatenated
into the document's page_content.
The base class takes care of pagination, document assembly, delta sync,
and journal fetching. Most loaders will not need to override anything
beyond the class attributes, but _record_to_document() is available
as a hook for loaders that need fancier content formatting.
Args:
connection: An initialized SnowConnection instance.
query: Optional encoded query string for filtering records.
fields: Optional list of specific fields to request from the API.
When left as None, the API returns all fields on the table.
include_journals: Whether to fetch and append work notes and
comments from sys_journal_field for each record.
Example:
class IncidentLoader(BaseSnowLoader):
table = "incident"
content_fields = ["short_description", "description"]
conn = SnowConnection(...)
loader = IncidentLoader(connection=conn, query="active=true")
for doc in loader.lazy_load():
print(doc.page_content)
"""
# Subclasses must set these
table: str = ""
content_fields: list[str] = []
[docs]
def __init__(
self,
connection: SnowConnection,
query: str | None = None,
fields: list[str] | None = None,
include_journals: bool = False,
) -> None:
self._connection = connection
self._query = query
self._fields = fields
self._include_journals = include_journals
[docs]
def load(self) -> list[SnowDocument]:
"""Fetch all matching records and return them as a list.
This is the simple, non-streaming interface. Under the hood it
just drains lazy_load() into a list. For large tables, prefer
lazy_load() directly to avoid holding everything in memory.
Returns:
List of SnowDocument instances, one per record.
"""
return list(self.lazy_load())
[docs]
def lazy_load(self, since: datetime | None = None) -> Generator[SnowDocument, None, None]:
"""Fetch records and yield them one at a time as SnowDocuments.
This is the primary loading interface. It streams records through
SnowConnection's paginated API and converts each one to a document
on the fly. Memory usage stays flat regardless of how many records
are in the table.
Args:
since: Optional cutoff datetime for delta sync. When set,
only records updated after this point are fetched.
Yields:
SnowDocument instances, one per ServiceNow record.
"""
records = self._connection.get_records(
table=self.table,
query=self._query,
fields=self._fields,
since=since,
)
for record in records:
doc = self._record_to_document(record)
yield doc
[docs]
def load_since(self, since: datetime) -> list[SnowDocument]:
"""Fetch only records updated after the given datetime.
Convenience wrapper around lazy_load() for incremental syncing.
Pass the timestamp of your last successful sync and you will only
get records that changed since then.
Args:
since: Cutoff datetime. Records with sys_updated_on after
this value are included.
Returns:
List of SnowDocument instances for the updated records.
"""
return list(self.lazy_load(since=since))
[docs]
def concurrent_lazy_load(
self,
since: datetime | None = None,
max_workers: int = 16,
) -> Generator[SnowDocument, None, None]:
"""Fetch records in parallel and yield them as SnowDocuments.
This is the threaded counterpart to lazy_load(). It dispatches page
fetches across a ThreadPoolExecutor inside SnowConnection, so wall
clock time on large tables drops roughly proportional to max_workers
(subject to ServiceNow rate limits and instance capacity). Memory
stays flat because results are still streamed as they arrive.
Records are yielded in the order pages complete, which is not the
same as sys_created_on order. If you need a stable ordering, sort
downstream or use lazy_load() instead.
Args:
since: Optional cutoff datetime for delta sync. When set,
only records updated after this point are fetched.
max_workers: Number of worker threads to use for page fetches.
Defaults to 16. Higher values speed up large tables but
may trip ServiceNow rate limits on smaller instances.
Yields:
SnowDocument instances, one per ServiceNow record, in the
order their pages complete (not sys_created_on order).
"""
records = self._connection.concurrent_get_records(
table=self.table,
query=self._query,
fields=self._fields,
since=since,
max_workers=max_workers,
)
for record in records:
yield self._record_to_document(record)
[docs]
def concurrent_load(self, max_workers: int = 16) -> list[SnowDocument]:
"""Fetch all matching records in parallel and return them as a list.
Threaded counterpart to load(). Drains concurrent_lazy_load() into
a list, so the same ordering caveat applies: documents come back
in page-completion order, not sys_created_on order. For very large
tables, prefer concurrent_lazy_load() to keep memory bounded.
Args:
max_workers: Number of worker threads to use for page fetches.
Defaults to 16.
Returns:
List of SnowDocument instances, one per record, in the order
their pages completed.
"""
return list(self.concurrent_lazy_load(max_workers=max_workers))
def _record_to_document(self, record: dict[str, Any]) -> SnowDocument:
"""Convert a single API record dict into a SnowDocument.
Concatenates the values of content_fields into page_content,
separated by newlines. Puts everything else into metadata.
Subclasses can override this for custom assembly logic.
Args:
record: Raw record dict from the ServiceNow API.
Returns:
A SnowDocument with assembled content and metadata.
"""
# Pull text from the designated content fields
content_parts = []
for field_name in self.content_fields:
value = record.get(field_name, "")
if value:
content_parts.append(str(value))
page_content = "\n".join(content_parts)
# If journals are requested, fetch and append them.
# Journal fetch is resilient - failures are logged, not raised,
# so a single inaccessible journal table does not crash the load.
sys_id = str(record.get("sys_id", ""))
if self._include_journals and sys_id:
journals = self._fetch_journals(sys_id)
journal_text = self._format_journals(journals)
if journal_text:
page_content = page_content + "\n\n" + journal_text
# Everything goes into metadata for downstream filtering
metadata: dict[str, Any] = {
"table": self.table,
}
for key, value in record.items():
metadata[key] = value
return SnowDocument(page_content=page_content, metadata=metadata)
def _fetch_journals(self, sys_id: str) -> list[dict[str, Any]]:
"""Pull work notes and comments for a record from sys_journal_field.
ServiceNow stores journal entries (work_notes, comments) in a
separate table linked by element_id. This method queries that
table for all entries belonging to the given record.
This method is resilient: if the journal table is inaccessible
(permissions, network error, etc.), it logs a warning and returns
an empty list instead of raising an exception.
Args:
sys_id: The sys_id of the parent record.
Returns:
List of journal entry dicts with value, element, sys_created_on,
and sys_created_by fields. Empty list on failure.
"""
try:
query = f"element_id={sys_id}^elementINwork_notes,comments"
records = self._connection.get_records(
table="sys_journal_field",
query=query,
fields=["value", "element", "sys_created_on", "sys_created_by"],
)
return list(records)
except SnowConnectionError:
logger.warning(
"Failed to fetch journals for record %s. Continuing without journal entries.",
sys_id,
exc_info=True,
)
return []
def _format_journals(self, journals: list[dict[str, Any]]) -> str:
"""Turn a list of journal entry dicts into a readable text block.
Each entry gets a header line with the type (work_notes or comments),
timestamp, and author, followed by the entry text. Entries are
separated by blank lines for readability.
Args:
journals: List of journal dicts as returned by _fetch_journals().
Returns:
Formatted string with all journal entries, or empty string if
the input list is empty.
"""
if not journals:
return ""
parts = []
for entry in journals:
element = entry.get("element", "note")
author = entry.get("sys_created_by", "unknown")
timestamp = entry.get("sys_created_on", "")
text = entry.get("value", "")
header = f"[{element}] {timestamp} by {author}"
parts.append(f"{header}\n{text}")
return "\n\n".join(parts)