Initial commit

This commit is contained in:
Zhongwei Li
2025-11-30 08:53:39 +08:00
commit f52a5ade52
29 changed files with 3131 additions and 0 deletions

View File

@@ -0,0 +1,12 @@
{
"name": "candlekeep",
"description": "Personal knowledge base system that gives AI agents direct access to your books",
"version": "0.1.0",
"author": {
"name": "Sahar Carmel",
"email": "sahar@example.com"
},
"skills": [
"./skills"
]
}

3
README.md Normal file
View File

@@ -0,0 +1,3 @@
# candlekeep
Personal knowledge base system that gives AI agents direct access to your books

149
plugin.lock.json Normal file
View File

@@ -0,0 +1,149 @@
{
"$schema": "internal://schemas/plugin.lock.v1.json",
"pluginId": "gh:SaharCarmel/Sahar-claude-code-marketplace:plugins/candlekeep",
"normalized": {
"repo": null,
"ref": "refs/tags/v20251128.0",
"commit": "dfaf75c96ec36d638005c0808b446c90a0e2a1da",
"treeHash": "f194e9f4dbdb0ad54ac1540741b44cfbfe8cb734ed365e6221ea89ec295824c3",
"generatedAt": "2025-11-28T10:12:43.763377Z",
"toolVersion": "publish_plugins.py@0.2.0"
},
"origin": {
"remote": "git@github.com:zhongweili/42plugin-data.git",
"branch": "master",
"commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390",
"repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data"
},
"manifest": {
"name": "candlekeep",
"description": "Personal knowledge base system that gives AI agents direct access to your books",
"version": "0.1.0"
},
"content": {
"files": [
{
"path": "README.md",
"sha256": "15cc1b93ef7d25da79c7352d12777794e002c802f9ded9f4c5155c58c1d8b3f5"
},
{
"path": ".claude-plugin/plugin.json",
"sha256": "f625c96fced4d9ca668bfa887f2fa8c99bab00eea199d2449bc3fa0bb7d88ff7"
},
{
"path": "skills/candlekeep/uv.lock",
"sha256": "d52317703da3f1131c7ae2d3634386cacf96f29936c727f1b11b698a711223b1"
},
{
"path": "skills/candlekeep/alembic.ini",
"sha256": "7d52d5ff9bf626d8cbe3aced1ad651e8ff8ec77ac74d4be6de84b2371c97f5e1"
},
{
"path": "skills/candlekeep/pyproject.toml",
"sha256": "c8e67daa2643e940fb4d2424f0112da6436facaf47ddd4fd28c690c0fef9b272"
},
{
"path": "skills/candlekeep/README.md",
"sha256": "d5cd69d74d1878b94750709b91fb2222c280fb4ff7a7e59b29484b18d1998886"
},
{
"path": "skills/candlekeep/SKILL.md",
"sha256": "ccf14ab1de9d9c8f2fcb91318b727c7737dbeeea5fcde39f4c1e34e827c88181"
},
{
"path": "skills/candlekeep/alembic/script.py.mako",
"sha256": "d38920781b4d31ae1c0a71bc09f41c2ade8feab9e5a087e3f30cb4bbf0c1c9d3"
},
{
"path": "skills/candlekeep/alembic/env.py",
"sha256": "cf9097c3d282b9680a95eb7a53d941590a30c41c4bd2248010c0bbf26a49419a"
},
{
"path": "skills/candlekeep/alembic/README",
"sha256": "31595cf53626af9ed16e15c44fa43183209cc163fbc3ebcb904b22ac436a8884"
},
{
"path": "skills/candlekeep/alembic/versions/e5ffbf97468e_initial_schema.py",
"sha256": "9e96a002f81ca023c0387e143d23d33c7a40a29950491d17f7a00affd3647b4a"
},
{
"path": "skills/candlekeep/alembic/versions/350115ea15b8_add_table_of_contents_field.py",
"sha256": "10b8a1b649cf10dbdc6ed221b118a007110842153e85fe7acb5769b8e9f74260"
},
{
"path": "skills/candlekeep/src/candlekeep/__init__.py",
"sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
},
{
"path": "skills/candlekeep/src/candlekeep/cli.py",
"sha256": "6919fcf324c20d77e6f3f305c92b973a0b9cfeb0661630709ffa142535ae7a89"
},
{
"path": "skills/candlekeep/src/candlekeep/parsers/__init__.py",
"sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
},
{
"path": "skills/candlekeep/src/candlekeep/parsers/pdf.py",
"sha256": "f9e40a46c5e1d09a07c05eaef2d43be2bb0f9250b171ab8012f7874cb69be6c5"
},
{
"path": "skills/candlekeep/src/candlekeep/parsers/markdown.py",
"sha256": "68b5e85546b733bf20816f6b2ab95eaa6d7e7ddd87d05460b6c2a36d22cde2db"
},
{
"path": "skills/candlekeep/src/candlekeep/utils/config.py",
"sha256": "a7e41649c785f064c57b8a2760a68733db5b53f2904c36b3684807209fb285b2"
},
{
"path": "skills/candlekeep/src/candlekeep/utils/__init__.py",
"sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
},
{
"path": "skills/candlekeep/src/candlekeep/utils/hash_utils.py",
"sha256": "3d8e0438c5ebe476e221caf1240f1484b45ee40aa5cf94722e5401fb326277b6"
},
{
"path": "skills/candlekeep/src/candlekeep/utils/content_utils.py",
"sha256": "c112b61720687de91bdbe430dc44399758a40dbdab4ddc0cc9f3e76d3a372148"
},
{
"path": "skills/candlekeep/src/candlekeep/utils/file_utils.py",
"sha256": "5895275378df240e14c9fb0e0191bfd770441b71a06c495202740beb6e612e42"
},
{
"path": "skills/candlekeep/src/candlekeep/db/models.py",
"sha256": "97b7a176bb442d516f2399861d45fab1f60c248579ecf320237e1f302cc1d1d3"
},
{
"path": "skills/candlekeep/src/candlekeep/db/session.py",
"sha256": "2d5366054636acdd2923a6d70c9a25c73ebd48dde52b75d574d0dd3fb8ccb6fe"
},
{
"path": "skills/candlekeep/src/candlekeep/db/__init__.py",
"sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
},
{
"path": "skills/candlekeep/src/candlekeep/commands/add.py",
"sha256": "c3f0da77c2a3b9f759569792add5b153508d475a32fbf415622c6276f0731b17"
},
{
"path": "skills/candlekeep/src/candlekeep/commands/query.py",
"sha256": "5d9f8c4f8b8c1db4152822494a44762d6c1cf39b709e8325d91c726109a3196c"
},
{
"path": "skills/candlekeep/src/candlekeep/commands/__init__.py",
"sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
},
{
"path": "skills/candlekeep/src/candlekeep/commands/init.py",
"sha256": "37ada12cb02a820a7e5709fd86790d6cece0d57d1c9b33fc0206b94f036d23df"
}
],
"dirSha256": "f194e9f4dbdb0ad54ac1540741b44cfbfe8cb734ed365e6221ea89ec295824c3"
},
"security": {
"scannedAt": null,
"scannerVersion": null,
"flags": []
}
}

197
skills/candlekeep/README.md Normal file
View File

@@ -0,0 +1,197 @@
# Candlekeep Skill
A personal knowledge base system that gives AI agents direct access to your books.
## Quick Start
### Installation
1. **Install Python dependencies**:
```bash
cd plugins/candlekeep/skills/candlekeep
uv sync
```
2. **Initialize Candlekeep**:
```bash
uv run candlekeep init
```
This creates `~/.candlekeep/` with database and configuration.
## Usage
### Add Books to Your Library
**Add a PDF:**
```bash
uv run candlekeep add-pdf ~/Documents/my-book.pdf \
--title "Book Title" \
--author "Author Name" \
--tags "programming,reference"
```
**Add a Markdown file:**
```bash
uv run candlekeep add-markdown ~/Documents/notes.md \
--title "My Notes" \
--tags "documentation"
```
### List Your Books
```bash
uv run candlekeep list
```
**Output:**
```
Books in library:
ID: 1
Title: The Pragmatic Programmer
Author: David Thomas, Andrew Hunt
Pages: 352
Format: pdf
Tags: programming, software-engineering
```
### Query Your Library
Once books are added, simply ask Claude questions in natural language:
**Example Questions:**
- "Do I have any books on Python? Show me what they say about decorators"
- "What does my software architecture book say about microservices?"
- "List all books tagged with 'machine-learning'"
**Claude will automatically:**
1. List available books
2. Show table of contents
3. Extract relevant pages
4. Provide answers with page citations
## Available Commands
All commands use `uv run candlekeep` from the skill directory.
### `init`
Initialize the Candlekeep database and configuration.
### `add-pdf <file_path>`
Add a PDF book to your library.
**Options:**
- `--title` - Book title (optional, extracted from metadata if not provided)
- `--author` - Author name (optional)
- `--tags` - Comma-separated tags (optional)
### `add-markdown <file_path>`
Add a Markdown document to your library.
**Options:**
- `--title` - Document title (required)
- `--author` - Author name (optional)
- `--tags` - Comma-separated tags (optional)
### `list`
List all books in your library with metadata.
### `toc <book_id>`
Show the table of contents for a specific book.
```bash
uv run candlekeep toc 1
```
### `query <book_id> <start_page> <end_page>`
Extract content from specific pages of a book.
```bash
uv run candlekeep query 1 10 15
```
**Returns:** Text content from pages 10-15 with page markers.
## How It Works
### Progressive Disclosure Pattern
Candlekeep uses a token-efficient approach:
1. **Discovery** - Lists book titles/metadata (~20-50 tokens per book)
2. **Navigation** - Shows table of contents when needed
3. **Extraction** - Fetches only requested page ranges
4. **Citation** - Includes page markers for precise references
This keeps context usage minimal while providing comprehensive access.
### Privacy & Storage
- **All data stored locally** in `~/.candlekeep/candlekeep.db` (SQLite)
- **No external API calls** - purely local processing
- **Content stored as chunks** with page-level granularity
- **Efficient indexing** by title, author, tags, and content
## File Structure
```
~/.candlekeep/
├── candlekeep.db # SQLite database
└── config.json # Configuration
plugins/candlekeep/skills/candlekeep/
├── SKILL.md # Skill definition for Claude
├── README.md # This file
├── pyproject.toml # Python dependencies
├── alembic/ # Database migrations
└── src/
└── candlekeep/
├── cli.py # Command-line interface
├── commands/ # Command implementations
├── db/ # Database models
├── parsers/ # PDF/Markdown parsers
└── utils/ # Utilities
```
## Requirements
- **Python**: >=3.10
- **UV**: >=0.1.0 (package manager)
## Troubleshooting
### "Database not found"
Run `uv run candlekeep init` to initialize the database.
### "PDF parsing failed"
Ensure the PDF is not corrupted and has extractable text. Some scanned PDFs without OCR won't work.
### "Dependencies not installed"
Run `uv sync` from the skill directory to install all dependencies.
### "Command not found: candlekeep"
Make sure you're running commands with `uv run candlekeep` prefix from the skill directory.
## Tips for Great Results
1. **Use descriptive tags** - Makes it easier for Claude to find relevant books
2. **Add complete metadata** - Helps with discovery and citation
3. **Break large documents** - Consider splitting very large books into volumes
4. **Regular queries** - Reference books frequently to maximize value
5. **Organize by topic** - Use consistent tagging schemes
## Support
For issues or questions:
- Review the [Plugin README](../../README.md) for installation help
- Check the SKILL.md for Claude's internal documentation
- Verify database exists at `~/.candlekeep/candlekeep.db`
---
**Created by**: Sahar Carmel
**License**: MIT

223
skills/candlekeep/SKILL.md Normal file
View File

@@ -0,0 +1,223 @@
---
name: candlekeep
description: Access a personal knowledge base of books to answer questions with actual source material rather than relying solely on training memory
---
# Candlekeep: Your Personal Library for AI Agents
Candlekeep is a knowledge base system that gives you direct access to books stored locally. Named after the legendary library fortress in D&D lore, it allows you to query actual book content rather than relying only on training data.
## Core Philosophy
**Books as Context, Not Data** - Candlekeep treats books as source material you can reference, maintaining precise page citations and table of contents navigation. This enables you to provide responses grounded in specific texts from the user's personal library.
## When to Use Candlekeep
Use Candlekeep when:
- User asks questions that could be answered from their book collection
- Research tasks require referencing specific documentation or books
- User mentions wanting to "check the book" or "look something up"
- User explicitly asks to search or query their knowledge base
- A task would benefit from grounded, citable source material
## Available Commands
All commands use `uv run candlekeep` from the skill directory.
### 1. Initialize (First Time Only)
```bash
cd ~/.claude/skills/candlekeep
uv run candlekeep init
```
Creates `~/.candlekeep/` directory with database and configuration.
### 2. List Books
```bash
uv run candlekeep list
```
Returns all books with metadata: ID, title, author, page count, tags, format.
**Output Format:**
```
Books in library:
ID: 1
Title: The Pragmatic Programmer
Author: David Thomas, Andrew Hunt
Pages: 352
Format: pdf
Tags: programming, software-engineering
```
### 3. Get Table of Contents
```bash
uv run candlekeep toc <book-id>
```
Returns hierarchical TOC for navigation.
**Example:**
```bash
uv run candlekeep toc 1
```
### 4. Extract Pages
```bash
uv run candlekeep pages <book-id> <start-page> <end-page>
```
Extracts content from specific page range.
**Example:**
```bash
uv run candlekeep pages 1 45 47
```
Returns markdown with page markers:
```
--- end of page=45 ---
[Content from page 45]
--- end of page=46 ---
[Content from page 46]
```
### 5. Add PDF Book
```bash
uv run candlekeep add-pdf /path/to/book.pdf
```
Converts PDF to markdown with page markers and stores in library.
### 6. Add Markdown Book
```bash
uv run candlekeep add-md /path/to/book.md
```
Adds markdown book with YAML frontmatter for metadata.
## Usage Patterns
### Progressive Disclosure Workflow
Follow this token-efficient pattern:
1. **List** → Get all available books
2. **TOC** → Find relevant sections
3. **Pages** → Extract specific content
**Example Session:**
```bash
# Step 1: See what books are available
uv run candlekeep list
# Step 2: Get TOC to find relevant chapter
uv run candlekeep toc 1
# Step 3: Extract specific pages based on TOC
uv run candlekeep pages 1 45 52
```
### Token Efficiency Guidelines
- **Don't extract entire books** - Use TOC to identify relevant sections first
- **Request small page ranges** - Start with 3-5 pages, expand if needed
- **Cache TOC information** - Remember book structure within conversation
- **Use list sparingly** - Only re-list if user adds new books
### Citing Sources
When providing answers from Candlekeep:
- Always cite the book title and page numbers
- Example: "According to *The Pragmatic Programmer* (pages 45-47), ..."
- Maintain academic rigor by grounding responses in actual text
## Error Handling
### Common Issues
**"Candlekeep not initialized"**
```bash
cd ~/.claude/skills/candlekeep && uv run candlekeep init
```
**"Book ID not found"**
- Run `uv run candlekeep list` to see valid IDs
**"UV not found"**
- User needs to install UV package manager: https://github.com/astral-sh/uv
**"Python version error"**
- Requires Python 3.10+
## Installation & Setup
### First-Time Setup
```bash
# 1. Install Python dependencies
cd ~/.claude/skills/candlekeep
uv sync
# 2. Initialize Candlekeep
uv run candlekeep init
# 3. Add your first book
uv run candlekeep add-pdf ~/Books/my-book.pdf
```
### Dependencies
- Python 3.10+
- UV package manager
- PyMuPDF (installed via uv sync)
- SQLite (included with Python)
## Current Limitations
Candlekeep is in early development (Phase 2 complete):
- ✅ PDF and Markdown support with page markers
- ✅ Metadata extraction and TOC storage
- ✅ SQLite database with deduplication
- ⏳ Full-text search (not yet implemented)
- ⏳ Note-taking features (not yet implemented)
- ⏳ Session tracking (not yet implemented)
## Best Practices
1. **Check initialization first** - Before any operation, verify Candlekeep is initialized
2. **Progressive queries** - List → TOC → Pages
3. **Small page ranges** - Extract only what you need
4. **Cite sources** - Always reference book and pages
5. **Handle errors gracefully** - Provide actionable error messages to user
6. **Respect privacy** - All data is local-first, never transmitted
## Working Directory
All commands should be executed from:
```
~/.claude/skills/candlekeep/
```
The Candlekeep data directory is:
```
~/.candlekeep/
├── config.yaml # Configuration
├── candlekeep.db # SQLite database
├── library/ # Converted markdown files
└── originals/ # Original PDF/MD files (optional)
```
## Example Interaction
**User:** "Can you check if I have any books on software testing?"
**You:**
```bash
cd ~/.claude/skills/candlekeep && uv run candlekeep list
```
If books found, examine TOC:
```bash
uv run candlekeep toc 3
```
Extract relevant section:
```bash
uv run candlekeep pages 3 120 125
```
Provide answer with citation:
"Based on *Software Testing Fundamentals* (pages 120-125), here are the key principles..."

View File

@@ -0,0 +1,148 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s/alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the tzdata library which can be installed by adding
# `alembic[tz]` to the pip requirements.
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
# sqlalchemy.url will be set programmatically in env.py
# sqlalchemy.url = sqlite:///~/.candlekeep/candlekeep.db
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@@ -0,0 +1 @@
Generic single-database configuration.

View File

@@ -0,0 +1,89 @@
from logging.config import fileConfig
from pathlib import Path
import os
import sys
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# Add parent directory to path to import our models
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.candlekeep.db.models import Base
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Set database URL programmatically
candlekeep_dir = Path.home() / ".candlekeep"
db_path = candlekeep_dir / "candlekeep.db"
config.set_main_option("sqlalchemy.url", f"sqlite:///{db_path}")
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,30 @@
"""add_table_of_contents_field
Revision ID: 350115ea15b8
Revises: e5ffbf97468e
Create Date: 2025-11-01 17:03:15.297500
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '350115ea15b8'
down_revision: Union[str, Sequence[str], None] = 'e5ffbf97468e'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# Add table_of_contents JSON column
op.add_column('books', sa.Column('table_of_contents', sa.JSON(), nullable=True))
def downgrade() -> None:
"""Downgrade schema."""
# Remove table_of_contents column
op.drop_column('books', 'table_of_contents')

View File

@@ -0,0 +1,79 @@
"""Initial schema
Revision ID: e5ffbf97468e
Revises:
Create Date: 2025-11-01 11:55:11.896876
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'e5ffbf97468e'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('books',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('title', sa.String(length=500), nullable=False),
sa.Column('author', sa.String(length=255), nullable=True),
sa.Column('original_file_path', sa.String(length=1000), nullable=False),
sa.Column('markdown_file_path', sa.String(length=1000), nullable=False),
sa.Column('source_type', sa.Enum('PDF', 'MARKDOWN', name='sourcetype'), nullable=False),
sa.Column('file_hash', sa.String(length=64), nullable=False),
sa.Column('added_date', sa.DateTime(), nullable=False),
sa.Column('modified_date', sa.DateTime(), nullable=False),
sa.Column('pdf_creation_date', sa.DateTime(), nullable=True),
sa.Column('pdf_mod_date', sa.DateTime(), nullable=True),
sa.Column('pdf_creator', sa.String(length=255), nullable=True),
sa.Column('pdf_producer', sa.String(length=255), nullable=True),
sa.Column('page_count', sa.Integer(), nullable=True),
sa.Column('word_count', sa.Integer(), nullable=True),
sa.Column('chapter_count', sa.Integer(), nullable=True),
sa.Column('subject', sa.String(length=500), nullable=True),
sa.Column('keywords', sa.Text(), nullable=True),
sa.Column('category', sa.String(length=100), nullable=True),
sa.Column('tags', sa.JSON(), nullable=True),
sa.Column('isbn', sa.String(length=20), nullable=True),
sa.Column('publisher', sa.String(length=255), nullable=True),
sa.Column('publication_year', sa.Integer(), nullable=True),
sa.Column('language', sa.String(length=10), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('file_hash')
)
op.create_index(op.f('ix_books_author'), 'books', ['author'], unique=False)
op.create_index(op.f('ix_books_category'), 'books', ['category'], unique=False)
op.create_index(op.f('ix_books_source_type'), 'books', ['source_type'], unique=False)
op.create_index(op.f('ix_books_title'), 'books', ['title'], unique=False)
op.create_table('book_notes',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('book_id', sa.Integer(), nullable=False),
sa.Column('note_type', sa.Enum('SUMMARY', 'REVIEW', 'TAG', 'OTHER', name='notetype'), nullable=False),
sa.Column('content', sa.Text(), nullable=False),
sa.Column('created_date', sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(['book_id'], ['books.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index('idx_book_type', 'book_notes', ['book_id', 'note_type'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('idx_book_type', table_name='book_notes')
op.drop_table('book_notes')
op.drop_index(op.f('ix_books_title'), table_name='books')
op.drop_index(op.f('ix_books_source_type'), table_name='books')
op.drop_index(op.f('ix_books_category'), table_name='books')
op.drop_index(op.f('ix_books_author'), table_name='books')
op.drop_table('books')
# ### end Alembic commands ###

View File

@@ -0,0 +1,25 @@
[project]
name = "candlekeep"
version = "0.1.0"
description = "A personal library that brings the wisdom of books to your AI agents"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"typer[all]>=0.9.0",
"sqlalchemy>=2.0.0",
"alembic>=1.13.0",
"pymupdf>=1.23.0",
"pymupdf4llm>=0.0.6",
"python-frontmatter>=1.0.0",
"rich>=13.0.0",
]
[project.scripts]
candlekeep = "candlekeep.cli:app"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/candlekeep"]

View File

@@ -0,0 +1,42 @@
"""CandleKeep CLI - Main entry point."""
import typer
from rich.console import Console
from .commands.init import init_command
from .commands.add import add_pdf, add_md
from .commands.query import list_books, get_toc, get_pages
app = typer.Typer(
name="candlekeep",
help="A personal library that brings the wisdom of books to your AI agents",
add_completion=False,
)
console = Console()
@app.command()
def init():
"""Initialize CandleKeep configuration and database."""
init_command()
# Register add commands
app.command(name="add-pdf")(add_pdf)
app.command(name="add-md")(add_md)
# Register query commands
app.command(name="list")(list_books)
app.command(name="toc")(get_toc)
app.command(name="pages")(get_pages)
@app.callback()
def main():
"""CandleKeep - Your personal library for AI agents."""
pass
if __name__ == "__main__":
app()

View File

@@ -0,0 +1,394 @@
"""Commands for adding books to the library."""
import shutil
from pathlib import Path
from typing import Optional, List
import typer
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.panel import Panel
from rich.table import Table
from sqlalchemy.exc import IntegrityError
from ..db.models import Book, SourceType
from ..db.session import get_db_manager
from ..parsers.pdf import parse_pdf
from ..parsers.markdown import parse_markdown
from ..utils.config import get_config
from ..utils.file_utils import sanitize_filename, ensure_directory, get_unique_filename
from ..utils.hash_utils import compute_file_hash
console = Console()
app = typer.Typer()
@app.command("add-pdf")
def add_pdf(
file_path: Path = typer.Argument(..., help="Path to PDF file", exists=True, dir_okay=False),
category: Optional[str] = typer.Option(None, "--category", "-c", help="Book category"),
tags: Optional[str] = typer.Option(None, "--tags", "-t", help="Comma-separated tags"),
keep_original: bool = typer.Option(True, "--keep-original/--no-keep-original", help="Keep original PDF file"),
title: Optional[str] = typer.Option(None, "--title", help="Override extracted title"),
author: Optional[str] = typer.Option(None, "--author", help="Override extracted author"),
):
"""
Add a PDF book to the CandleKeep library.
The PDF will be converted to markdown and metadata will be extracted and stored.
"""
try:
config = get_config()
# Check if CandleKeep is initialized
if not config.is_initialized:
console.print("[red]Error:[/red] CandleKeep not initialized. Run 'candlekeep init' first.")
raise typer.Exit(1)
# Validate file is a PDF
if file_path.suffix.lower() != '.pdf':
console.print(f"[red]Error:[/red] File must be a PDF, got: {file_path.suffix}")
raise typer.Exit(1)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
# Step 1: Compute file hash
task = progress.add_task("[cyan]Computing file hash...", total=None)
file_hash = compute_file_hash(file_path)
progress.update(task, completed=True)
# Step 2: Check for duplicates
task = progress.add_task("[cyan]Checking for duplicates...", total=None)
db_manager = get_db_manager()
with db_manager.get_session() as session:
existing = session.query(Book).filter(Book.file_hash == file_hash).first()
if existing:
progress.stop()
console.print(f"\n[yellow]Book already exists:[/yellow] {existing.title} (ID: {existing.id})")
raise typer.Exit(0)
progress.update(task, completed=True)
# Step 3: Parse PDF and extract metadata
task = progress.add_task("[cyan]Parsing PDF and extracting metadata...", total=None)
try:
metadata = parse_pdf(file_path, convert_to_md=True)
except Exception as e:
progress.stop()
console.print(f"\n[red]Error parsing PDF:[/red] {e}")
raise typer.Exit(1)
progress.update(task, completed=True)
# Override metadata if provided
if title:
metadata['title'] = title
if author:
metadata['author'] = author
if category:
metadata['category'] = category
# Parse tags
tag_list = None
if tags:
tag_list = [tag.strip() for tag in tags.split(',') if tag.strip()]
# Step 4: Save markdown to library
task = progress.add_task("[cyan]Saving markdown to library...", total=None)
# Generate filename from title
safe_filename = sanitize_filename(metadata['title'])
md_filepath = get_unique_filename(config.library_dir, safe_filename, '.md')
# Write markdown content
ensure_directory(config.library_dir)
with open(md_filepath, 'w', encoding='utf-8') as f:
f.write(metadata['markdown_content'])
progress.update(task, completed=True)
# Step 5: Optionally copy original PDF
original_path = file_path
if keep_original:
task = progress.add_task("[cyan]Copying original PDF...", total=None)
ensure_directory(config.originals_dir)
original_copy_path = get_unique_filename(config.originals_dir, safe_filename, '.pdf')
shutil.copy2(file_path, original_copy_path)
original_path = original_copy_path
progress.update(task, completed=True)
# Step 6: Insert into database
task = progress.add_task("[cyan]Storing metadata in database...", total=None)
book = Book(
title=metadata.get('title', 'Untitled'),
author=metadata.get('author'),
original_file_path=str(original_path),
markdown_file_path=str(md_filepath),
source_type=SourceType.PDF,
file_hash=file_hash,
pdf_creation_date=metadata.get('pdf_creation_date'),
pdf_mod_date=metadata.get('pdf_mod_date'),
pdf_creator=metadata.get('pdf_creator'),
pdf_producer=metadata.get('pdf_producer'),
page_count=metadata.get('page_count'),
word_count=metadata.get('word_count'),
chapter_count=metadata.get('chapter_count', 0),
table_of_contents=metadata.get('table_of_contents'),
subject=metadata.get('subject'),
keywords=metadata.get('keywords'),
category=category,
tags=tag_list,
language='en',
)
try:
with db_manager.get_session() as session:
session.add(book)
session.flush() # Get the ID
book_id = book.id
progress.update(task, completed=True)
except IntegrityError as e:
progress.stop()
console.print(f"\n[red]Database error:[/red] {e}")
# Clean up created files
if md_filepath.exists():
md_filepath.unlink()
if keep_original and original_copy_path.exists():
original_copy_path.unlink()
raise typer.Exit(1)
# Success message
_display_success(book_id, metadata, md_filepath, category, tag_list)
except typer.Exit:
raise
except Exception as e:
console.print(f"\n[red]Unexpected error:[/red] {e}")
raise typer.Exit(1)
def _display_success(
book_id: int,
metadata: dict,
md_filepath: Path,
category: Optional[str],
tags: Optional[List[str]]
):
"""Display success message with book details."""
# Create details table
table = Table(show_header=False, box=None, padding=(0, 2))
table.add_column("Field", style="cyan")
table.add_column("Value", style="white")
table.add_row("ID", str(book_id))
table.add_row("Title", metadata.get('title', 'Untitled'))
if metadata.get('author'):
table.add_row("Author", metadata['author'])
if category:
table.add_row("Category", category)
if tags:
table.add_row("Tags", ", ".join(tags))
table.add_row("Pages", str(metadata.get('page_count', 'N/A')))
table.add_row("Words", f"{metadata.get('word_count', 0):,}")
table.add_row("Chapters", str(metadata.get('chapter_count', 0)))
table.add_row("Markdown", str(md_filepath))
panel = Panel(
table,
title="[green bold]✓ Book Added Successfully",
border_style="green",
)
console.print()
console.print(panel)
@app.command("add-md")
def add_md(
file_path: Path = typer.Argument(..., help="Path to markdown file", exists=True, dir_okay=False),
category: Optional[str] = typer.Option(None, "--category", "-c", help="Book category"),
tags: Optional[str] = typer.Option(None, "--tags", "-t", help="Comma-separated tags"),
title: Optional[str] = typer.Option(None, "--title", help="Override extracted title"),
author: Optional[str] = typer.Option(None, "--author", help="Override extracted author"),
):
"""
Add a markdown book to the CandleKeep library.
The markdown file will be copied to the library and metadata will be extracted and stored.
Metadata can be provided via YAML frontmatter or will be extracted from the document structure.
"""
try:
config = get_config()
# Check if CandleKeep is initialized
if not config.is_initialized:
console.print("[red]Error:[/red] CandleKeep not initialized. Run 'candlekeep init' first.")
raise typer.Exit(1)
# Validate file is markdown
if file_path.suffix.lower() not in ['.md', '.markdown']:
console.print(f"[red]Error:[/red] File must be a markdown file (.md or .markdown), got: {file_path.suffix}")
raise typer.Exit(1)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
# Step 1: Compute file hash
task = progress.add_task("[cyan]Computing file hash...", total=None)
file_hash = compute_file_hash(file_path)
progress.update(task, completed=True)
# Step 2: Check for duplicates
task = progress.add_task("[cyan]Checking for duplicates...", total=None)
db_manager = get_db_manager()
with db_manager.get_session() as session:
existing = session.query(Book).filter(Book.file_hash == file_hash).first()
if existing:
progress.stop()
console.print(f"\n[yellow]Book already exists:[/yellow] {existing.title} (ID: {existing.id})")
raise typer.Exit(0)
progress.update(task, completed=True)
# Step 3: Parse markdown and extract metadata
task = progress.add_task("[cyan]Parsing markdown and extracting metadata...", total=None)
try:
metadata = parse_markdown(file_path)
except Exception as e:
progress.stop()
console.print(f"\n[red]Error parsing markdown:[/red] {e}")
raise typer.Exit(1)
progress.update(task, completed=True)
# Override metadata if provided
if title:
metadata['title'] = title
if author:
metadata['author'] = author
if category:
metadata['category'] = category
# Parse tags
tag_list = metadata.get('tags', [])
if tags:
# CLI tags override frontmatter tags
tag_list = [tag.strip() for tag in tags.split(',') if tag.strip()]
elif isinstance(tag_list, list):
# Use frontmatter tags as-is
pass
else:
tag_list = None
# Step 4: Copy markdown to library
task = progress.add_task("[cyan]Copying markdown to library...", total=None)
# Generate filename from title
safe_filename = sanitize_filename(metadata['title'])
md_filepath = get_unique_filename(config.library_dir, safe_filename, '.md')
# Copy file to library
ensure_directory(config.library_dir)
shutil.copy2(file_path, md_filepath)
progress.update(task, completed=True)
# Step 5: Insert into database
task = progress.add_task("[cyan]Storing metadata in database...", total=None)
book = Book(
title=metadata.get('title', 'Untitled'),
author=metadata.get('author'),
original_file_path=str(file_path),
markdown_file_path=str(md_filepath),
source_type=SourceType.MARKDOWN,
file_hash=file_hash,
page_count=None, # Markdown doesn't have pages
word_count=metadata.get('word_count'),
chapter_count=metadata.get('chapter_count', 0),
table_of_contents=metadata.get('table_of_contents'),
subject=metadata.get('subject'),
keywords=metadata.get('keywords'),
category=category or metadata.get('category'),
tags=tag_list,
isbn=metadata.get('isbn'),
publisher=metadata.get('publisher'),
publication_year=metadata.get('publication_year'),
language=metadata.get('language', 'en'),
)
try:
with db_manager.get_session() as session:
session.add(book)
session.flush() # Get the ID
book_id = book.id
progress.update(task, completed=True)
except IntegrityError as e:
progress.stop()
console.print(f"\n[red]Database error:[/red] {e}")
# Clean up created file
if md_filepath.exists():
md_filepath.unlink()
raise typer.Exit(1)
# Success message
_display_success_md(book_id, metadata, md_filepath, category or metadata.get('category'), tag_list)
except typer.Exit:
raise
except Exception as e:
console.print(f"\n[red]Unexpected error:[/red] {e}")
raise typer.Exit(1)
def _display_success_md(
book_id: int,
metadata: dict,
md_filepath: Path,
category: Optional[str],
tags: Optional[List[str]]
):
"""Display success message for markdown book with details."""
# Create details table
table = Table(show_header=False, box=None, padding=(0, 2))
table.add_column("Field", style="cyan")
table.add_column("Value", style="white")
table.add_row("ID", str(book_id))
table.add_row("Title", metadata.get('title', 'Untitled'))
if metadata.get('author'):
table.add_row("Author", metadata['author'])
if category:
table.add_row("Category", category)
if tags:
table.add_row("Tags", ", ".join(tags))
table.add_row("Words", f"{metadata.get('word_count', 0):,}")
table.add_row("Chapters", str(metadata.get('chapter_count', 0)))
if metadata.get('isbn'):
table.add_row("ISBN", metadata['isbn'])
if metadata.get('publisher'):
table.add_row("Publisher", metadata['publisher'])
if metadata.get('publication_year'):
table.add_row("Year", str(metadata['publication_year']))
table.add_row("File", str(md_filepath))
panel = Panel(
table,
title="[green bold]✓ Markdown Book Added Successfully",
border_style="green",
)
console.print()
console.print(panel)

View File

@@ -0,0 +1,76 @@
"""Init command - initialize CandleKeep configuration."""
import subprocess
from pathlib import Path
import typer
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Confirm
console = Console()
def init_command():
"""Initialize CandleKeep configuration and database."""
candlekeep_dir = Path.home() / ".candlekeep"
library_dir = candlekeep_dir / "library"
originals_dir = candlekeep_dir / "originals"
db_path = candlekeep_dir / "candlekeep.db"
# Check if already initialized
if candlekeep_dir.exists() and db_path.exists():
console.print("[yellow]⚠ CandleKeep is already initialized.[/yellow]")
console.print(f"Database: {db_path}")
console.print(f"Library: {library_dir}")
if not Confirm.ask("Do you want to reinitialize?", default=False):
console.print("[cyan]Initialization cancelled.[/cyan]")
return
console.print(
Panel.fit(
"[bold cyan]CandleKeep Initialization[/bold cyan]\n\n"
"Setting up your local library with SQLite database.",
border_style="cyan"
)
)
# Create directories
console.print("\n[cyan]Creating directories...[/cyan]")
candlekeep_dir.mkdir(parents=True, exist_ok=True)
library_dir.mkdir(parents=True, exist_ok=True)
originals_dir.mkdir(parents=True, exist_ok=True)
console.print(f"[green]✓[/green] Created {candlekeep_dir}")
console.print(f"[green]✓[/green] Created {library_dir}")
console.print(f"[green]✓[/green] Created {originals_dir}")
# Run Alembic migrations
console.print("\n[cyan]Initializing database...[/cyan]")
try:
result = subprocess.run(
["uv", "run", "alembic", "upgrade", "head"],
capture_output=True,
text=True,
check=True
)
console.print("[green]✓[/green] Database schema created")
except subprocess.CalledProcessError as e:
console.print(f"[red]✗ Failed to create database schema[/red]")
console.print(f"Error: {e.stderr}")
raise typer.Exit(1)
except Exception as e:
console.print(f"[red]✗ Unexpected error: {e}[/red]")
raise typer.Exit(1)
# Success message
console.print(
Panel.fit(
"[bold green]✓ CandleKeep initialized successfully![/bold green]\n\n"
f"Database: {db_path}\n"
f"Library: {library_dir}\n"
f"Originals: {originals_dir}\n\n"
"You can now add books with: [cyan]candlekeep add-pdf <file>[/cyan]",
border_style="green"
)
)

View File

@@ -0,0 +1,343 @@
"""Commands for querying books in the library."""
import re
from pathlib import Path
from typing import Optional, List
import typer
from rich.console import Console
from ..db.models import Book
from ..db.session import get_db_manager
from ..utils.config import get_config
console = Console()
app = typer.Typer()
def _format_book_for_llm(book: Book, full: bool = False, fields: Optional[List[str]] = None) -> str:
"""
Format a book's metadata in LLM-optimized text format.
Uses structured markdown with key-value pairs for easy parsing.
"""
lines = []
lines.append(f"## Book ID: {book.id}")
lines.append(f"Title: {book.title}")
# Essential fields (always shown)
if book.author:
lines.append(f"Author: {book.author}")
lines.append(f"Type: {book.source_type.value}")
if book.page_count:
lines.append(f"Pages: {book.page_count}")
if book.added_date:
lines.append(f"Added: {book.added_date.strftime('%Y-%m-%d %H:%M:%S')}")
# Additional fields based on flags
if full or (fields and 'category' in fields):
if book.category:
lines.append(f"Category: {book.category}")
if full or (fields and 'tags' in fields):
if book.tags:
lines.append(f"Tags: {', '.join(book.tags)}")
if full or (fields and 'word_count' in fields):
if book.word_count:
lines.append(f"Word Count: {book.word_count:,}")
if full or (fields and 'chapter_count' in fields):
if book.chapter_count:
lines.append(f"Chapters: {book.chapter_count}")
if full:
# Show all metadata
if book.subject:
lines.append(f"Subject: {book.subject}")
if book.keywords:
lines.append(f"Keywords: {book.keywords}")
if book.isbn:
lines.append(f"ISBN: {book.isbn}")
if book.publisher:
lines.append(f"Publisher: {book.publisher}")
if book.publication_year:
lines.append(f"Publication Year: {book.publication_year}")
if book.language:
lines.append(f"Language: {book.language}")
if book.pdf_creator:
lines.append(f"PDF Creator: {book.pdf_creator}")
if book.pdf_producer:
lines.append(f"PDF Producer: {book.pdf_producer}")
if book.pdf_creation_date:
lines.append(f"PDF Created: {book.pdf_creation_date.strftime('%Y-%m-%d %H:%M:%S')}")
if book.pdf_mod_date:
lines.append(f"PDF Modified: {book.pdf_mod_date.strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f"Original Path: {book.original_file_path}")
lines.append(f"Markdown Path: {book.markdown_file_path}")
return "\n".join(lines)
def _format_toc_for_llm(book: Book) -> str:
"""
Format a book's table of contents in LLM-optimized text format.
Uses hierarchical indentation for nested structure.
"""
lines = []
lines.append(f"## Table of Contents - Book ID: {book.id}")
lines.append(f"Title: {book.title}")
lines.append("")
if not book.table_of_contents:
lines.append("No table of contents available for this book.")
return "\n".join(lines)
# Format TOC entries with hierarchical indentation
for entry in book.table_of_contents:
level = entry.get('level', 1)
title = entry.get('title', 'Untitled')
page = entry.get('page', 'N/A')
# Indent based on level (2 spaces per level)
indent = " " * (level - 1)
lines.append(f"{indent}{title} (Page {page})")
return "\n".join(lines)
def _parse_page_ranges(page_str: str) -> List[int]:
"""
Parse page range string into list of page numbers.
Supports formats like:
- "1,2,3" -> [1, 2, 3]
- "1-5" -> [1, 2, 3, 4, 5]
- "1-5,10-15" -> [1, 2, 3, 4, 5, 10, 11, 12, 13, 14, 15]
"""
pages = set()
for part in page_str.split(','):
part = part.strip()
if '-' in part:
# Range
start, end = part.split('-', 1)
try:
start_num = int(start.strip())
end_num = int(end.strip())
pages.update(range(start_num, end_num + 1))
except ValueError:
raise ValueError(f"Invalid page range: {part}")
else:
# Single page
try:
pages.add(int(part))
except ValueError:
raise ValueError(f"Invalid page number: {part}")
return sorted(pages)
def _extract_pages_from_markdown(md_path: Path, pages: List[int]) -> str:
"""
Extract specific pages from a markdown file.
Uses page markers inserted during PDF/markdown processing.
Returns markdown content for requested pages.
"""
if not md_path.exists():
raise FileNotFoundError(f"Markdown file not found: {md_path}")
with open(md_path, 'r', encoding='utf-8') as f:
content = f.read()
# Find all page markers
# Pattern: --- end of page=N ---
page_pattern = re.compile(r'--- end of page=(\d+) ---')
matches = list(page_pattern.finditer(content))
if not matches:
# No page markers found - return entire content if page 1 is requested
if 1 in pages:
return content
else:
return ""
# Build a map of page numbers to content positions
# Note: "end of page=N" means content BEFORE this marker is page N
page_map = {}
for i, match in enumerate(matches):
page_num = int(match.group(1))
# Start position is after previous marker (or start of file for page 0)
if i == 0:
start_pos = 0
else:
start_pos = matches[i - 1].end()
# End position is at the current marker (before "--- end of page=N ---")
end_pos = match.start()
page_map[page_num] = (start_pos, end_pos)
# Extract requested pages
result_lines = []
for page_num in pages:
if page_num in page_map:
start, end = page_map[page_num]
page_content = content[start:end].strip()
result_lines.append(f"### Page {page_num}")
result_lines.append(page_content)
result_lines.append("") # Blank line separator
return "\n".join(result_lines)
@app.command("list")
def list_books(
full: bool = typer.Option(False, "--full", help="Show all metadata fields"),
fields: Optional[str] = typer.Option(None, "--fields", help="Comma-separated list of specific fields to show"),
):
"""
List all books in the library with metadata.
Output is optimized for LLM consumption with structured markdown format.
"""
try:
config = get_config()
# Check if CandleKeep is initialized
if not config.is_initialized:
console.print("Error: CandleKeep not initialized. Run 'candlekeep init' first.")
raise typer.Exit(1)
# Parse fields if provided
field_list = None
if fields:
field_list = [f.strip() for f in fields.split(',')]
# Get all books from database
db_manager = get_db_manager()
with db_manager.get_session() as session:
books = session.query(Book).order_by(Book.id).all()
if not books:
console.print("No books found in library.")
raise typer.Exit(0)
# Format output
output_lines = [f"# Library Books (Total: {len(books)})", ""]
for book in books:
book_text = _format_book_for_llm(book, full=full, fields=field_list)
output_lines.append(book_text)
output_lines.append("") # Blank line between books
# Print to stdout
print("\n".join(output_lines))
except typer.Exit:
raise
except Exception as e:
console.print(f"Error: {e}")
raise typer.Exit(1)
@app.command("toc")
def get_toc(
book_id: int = typer.Argument(..., help="Book ID to get table of contents for"),
):
"""
Get table of contents for a specific book.
Output is optimized for LLM consumption with hierarchical text format.
"""
try:
config = get_config()
# Check if CandleKeep is initialized
if not config.is_initialized:
console.print("Error: CandleKeep not initialized. Run 'candlekeep init' first.")
raise typer.Exit(1)
# Get book from database
db_manager = get_db_manager()
with db_manager.get_session() as session:
book = session.query(Book).filter(Book.id == book_id).first()
if not book:
console.print(f"Error: Book with ID {book_id} not found.")
raise typer.Exit(1)
# Format and print TOC
toc_text = _format_toc_for_llm(book)
print(toc_text)
except typer.Exit:
raise
except Exception as e:
console.print(f"Error: {e}")
raise typer.Exit(1)
@app.command("pages")
def get_pages(
book_id: int = typer.Argument(..., help="Book ID to get pages from"),
pages: str = typer.Option(..., "--pages", "-p", help="Page ranges (e.g., '1-5,10-15' or '1,2,3')"),
):
"""
Get specific pages from a book's markdown content.
Supports page ranges and multiple pages. Output is raw markdown content.
"""
try:
config = get_config()
# Check if CandleKeep is initialized
if not config.is_initialized:
console.print("Error: CandleKeep not initialized. Run 'candlekeep init' first.")
raise typer.Exit(1)
# Parse page ranges
try:
page_list = _parse_page_ranges(pages)
except ValueError as e:
console.print(f"Error: {e}")
raise typer.Exit(1)
# Get book from database
db_manager = get_db_manager()
with db_manager.get_session() as session:
book = session.query(Book).filter(Book.id == book_id).first()
if not book:
console.print(f"Error: Book with ID {book_id} not found.")
raise typer.Exit(1)
# Extract pages from markdown file
md_path = Path(book.markdown_file_path)
try:
content = _extract_pages_from_markdown(md_path, page_list)
if not content:
console.print(f"Warning: No content found for requested pages.")
raise typer.Exit(0)
# Print header and content
print(f"## Book ID: {book.id} - {book.title}")
print(f"Pages: {pages}")
print("")
print(content)
except FileNotFoundError as e:
console.print(f"Error: {e}")
raise typer.Exit(1)
except typer.Exit:
raise
except Exception as e:
console.print(f"Error: {e}")
raise typer.Exit(1)

View File

@@ -0,0 +1,123 @@
"""SQLAlchemy models for CandleKeep database."""
from datetime import datetime
from typing import Optional
from sqlalchemy import (
Column,
Integer,
String,
Text,
DateTime,
Enum,
ForeignKey,
Index,
JSON,
)
from sqlalchemy.orm import DeclarativeBase, relationship
import enum
class Base(DeclarativeBase):
"""Base class for all models."""
pass
class SourceType(enum.Enum):
"""Source type for books."""
PDF = "pdf"
MARKDOWN = "markdown"
class NoteType(enum.Enum):
"""Note type for book annotations."""
SUMMARY = "summary"
REVIEW = "review"
TAG = "tag"
OTHER = "other"
class Book(Base):
"""Book model - stores metadata only, content in markdown files."""
__tablename__ = "books"
# Primary key
id = Column(Integer, primary_key=True, autoincrement=True)
# Core metadata
title = Column(String(500), nullable=False, index=True)
author = Column(String(255), index=True)
# File information
original_file_path = Column(String(1000), nullable=False)
markdown_file_path = Column(String(1000), nullable=False)
source_type = Column(Enum(SourceType), nullable=False, index=True)
file_hash = Column(String(64), unique=True, nullable=False)
# Dates
added_date = Column(DateTime, default=datetime.utcnow, nullable=False)
modified_date = Column(
DateTime,
default=datetime.utcnow,
onupdate=datetime.utcnow,
nullable=False
)
# PDF-specific metadata
pdf_creation_date = Column(DateTime)
pdf_mod_date = Column(DateTime)
pdf_creator = Column(String(255))
pdf_producer = Column(String(255))
# Content metrics
page_count = Column(Integer)
word_count = Column(Integer)
chapter_count = Column(Integer)
table_of_contents = Column(JSON) # List of TOC entries with level, title, page
# Categorization
subject = Column(String(500))
keywords = Column(Text) # Comma-separated
category = Column(String(100), index=True)
tags = Column(JSON) # List of tags
# Additional info
isbn = Column(String(20))
publisher = Column(String(255))
publication_year = Column(Integer)
language = Column(String(10), default="en")
# Relationships
notes = relationship("BookNote", back_populates="book", cascade="all, delete-orphan")
def __repr__(self):
return f"<Book(id={self.id}, title='{self.title}', author='{self.author}')>"
class BookNote(Base):
"""Book notes and annotations."""
__tablename__ = "book_notes"
# Primary key
id = Column(Integer, primary_key=True, autoincrement=True)
# Foreign key
book_id = Column(Integer, ForeignKey("books.id", ondelete="CASCADE"), nullable=False)
# Note data
note_type = Column(Enum(NoteType), default=NoteType.OTHER, nullable=False)
content = Column(Text, nullable=False)
created_date = Column(DateTime, default=datetime.utcnow, nullable=False)
# Relationships
book = relationship("Book", back_populates="notes")
# Indexes
__table_args__ = (
Index("idx_book_type", "book_id", "note_type"),
)
def __repr__(self):
return f"<BookNote(id={self.id}, book_id={self.book_id}, type={self.note_type})>"

View File

@@ -0,0 +1,101 @@
"""Database session management for CandleKeep."""
from pathlib import Path
from typing import Optional
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from .models import Base
def get_db_path() -> Path:
"""Get the database file path.
Returns:
Path to SQLite database file
"""
return Path.home() / ".candlekeep" / "candlekeep.db"
def get_connection_string() -> str:
"""Get SQLite connection string.
Returns:
SQLAlchemy connection string for SQLite
"""
db_path = get_db_path()
return f"sqlite:///{db_path}"
class DatabaseManager:
"""Manages database connections and sessions."""
def __init__(self):
"""Initialize database manager with SQLite."""
self.db_path = get_db_path()
self.connection_string = get_connection_string()
self.engine = None
self.SessionLocal = None
def connect(self):
"""Create database engine and session factory."""
# Ensure database directory exists
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.engine = create_engine(
self.connection_string,
connect_args={"check_same_thread": False}, # For SQLite
echo=False, # Set to True for SQL debugging
)
self.SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=self.engine
)
@contextmanager
def get_session(self):
"""Get a database session with automatic cleanup.
Yields:
Session: SQLAlchemy session
"""
if self.SessionLocal is None:
raise RuntimeError("Database not connected. Call connect() first.")
session = self.SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
def close(self):
"""Close database connection."""
if self.engine:
self.engine.dispose()
# Global database manager instance
_db_manager: Optional[DatabaseManager] = None
def get_db_manager() -> DatabaseManager:
"""Get the global database manager instance.
Returns:
DatabaseManager instance
Raises:
RuntimeError: If database manager not initialized
"""
global _db_manager
if _db_manager is None:
_db_manager = DatabaseManager()
_db_manager.connect()
return _db_manager

View File

@@ -0,0 +1,330 @@
"""Markdown parsing and metadata extraction."""
import re
from pathlib import Path
from typing import Dict, Optional, Any, List
import frontmatter
from ..utils.file_utils import parse_filename_metadata
class MarkdownParser:
"""Parser for extracting metadata and content from markdown files."""
def __init__(self, md_path: Path):
"""
Initialize Markdown parser.
Args:
md_path: Path to markdown file
Raises:
FileNotFoundError: If markdown file doesn't exist
ValueError: If file cannot be read
"""
self.md_path = Path(md_path)
if not self.md_path.exists():
raise FileNotFoundError(f"Markdown file not found: {self.md_path}")
try:
with open(self.md_path, 'r', encoding='utf-8') as f:
self.post = frontmatter.load(f)
except Exception as e:
raise ValueError(f"Failed to read markdown file: {e}")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
pass
def extract_metadata(self) -> Dict[str, Any]:
"""
Extract all metadata from markdown file.
Priority order for metadata:
1. YAML frontmatter
2. First heading (for title)
3. Filename parsing
4. Defaults
Returns:
Dictionary containing all extracted metadata
"""
metadata = {}
# Extract frontmatter metadata
frontmatter_data = self._extract_frontmatter_metadata()
metadata.update(frontmatter_data)
# Get content (without frontmatter)
content = self.post.content
# If title missing, try first heading
if not metadata.get('title'):
first_heading = self._extract_first_heading(content)
if first_heading:
metadata['title'] = first_heading
# If title or author still missing, try filename parsing
if not metadata.get('title') or not metadata.get('author'):
filename_title, filename_author = parse_filename_metadata(self.md_path.name)
if not metadata.get('title') and filename_title:
metadata['title'] = filename_title
if not metadata.get('author') and filename_author:
metadata['author'] = filename_author
# If still no title, use filename
if not metadata.get('title'):
metadata['title'] = self.md_path.stem
# Extract or generate table of contents
toc = self._extract_or_generate_toc(content)
metadata['table_of_contents'] = toc
metadata['chapter_count'] = len(toc)
# Count words and headings
metadata['word_count'] = self.count_words(content)
# Store full content
metadata['content'] = content
return metadata
def _extract_frontmatter_metadata(self) -> Dict[str, Any]:
"""
Extract metadata from YAML frontmatter.
Returns:
Dictionary of frontmatter metadata
"""
metadata = {}
# Title
if title := self.post.get('title', '').strip() if isinstance(self.post.get('title'), str) else '':
metadata['title'] = title
# Author
if author := self.post.get('author', '').strip() if isinstance(self.post.get('author'), str) else '':
metadata['author'] = author
# Subject
if subject := self.post.get('subject', '').strip() if isinstance(self.post.get('subject'), str) else '':
metadata['subject'] = subject
# Keywords
if keywords := self.post.get('keywords', '').strip() if isinstance(self.post.get('keywords'), str) else '':
metadata['keywords'] = keywords
# Category
if category := self.post.get('category', '').strip() if isinstance(self.post.get('category'), str) else '':
metadata['category'] = category
# Tags (can be list or comma-separated string)
tags = self.post.get('tags', [])
if tags:
if isinstance(tags, list):
# Convert list to comma-separated string
metadata['tags'] = tags
elif isinstance(tags, str):
# Parse comma-separated string
metadata['tags'] = [tag.strip() for tag in tags.split(',')]
# ISBN
if isbn := self.post.get('isbn', '').strip() if isinstance(self.post.get('isbn'), str) else '':
metadata['isbn'] = isbn
# Publisher
if publisher := self.post.get('publisher', '').strip() if isinstance(self.post.get('publisher'), str) else '':
metadata['publisher'] = publisher
# Publication year
if year := self.post.get('publication_year'):
try:
metadata['publication_year'] = int(year)
except (ValueError, TypeError):
pass
# Language
if language := self.post.get('language', '').strip() if isinstance(self.post.get('language'), str) else '':
metadata['language'] = language
# Table of contents from frontmatter (if exists)
if toc := self.post.get('toc') or self.post.get('table_of_contents'):
if isinstance(toc, list):
metadata['frontmatter_toc'] = toc
return metadata
def _extract_first_heading(self, content: str) -> Optional[str]:
"""
Extract title from first # heading in content.
Args:
content: Markdown content
Returns:
First heading text or None
"""
# Match first level-1 heading (# Title)
match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
return match.group(1).strip() if match else None
def _extract_or_generate_toc(self, content: str) -> List[Dict[str, Any]]:
"""
Extract TOC from frontmatter or generate from headings.
Strategy:
1. Check frontmatter for 'toc' or 'table_of_contents' field
2. If found and valid, use it
3. Otherwise, generate from markdown headings
Args:
content: Markdown content
Returns:
List of TOC entries with level, title, and page (0 for markdown)
"""
# Check if frontmatter has TOC
frontmatter_toc = self.post.get('toc') or self.post.get('table_of_contents')
if frontmatter_toc and isinstance(frontmatter_toc, list):
# Validate and normalize frontmatter TOC
return self._normalize_frontmatter_toc(frontmatter_toc)
# Generate TOC from headings
return self._generate_toc_from_headings(content)
def _normalize_frontmatter_toc(self, toc_data: List) -> List[Dict[str, Any]]:
"""
Normalize frontmatter TOC to standard format.
Args:
toc_data: TOC from frontmatter
Returns:
Normalized TOC entries
"""
normalized = []
for entry in toc_data:
if isinstance(entry, dict):
normalized.append({
'level': entry.get('level', 1),
'title': entry.get('title', '').strip(),
'page': entry.get('page', 0) # Markdown doesn't have pages
})
elif isinstance(entry, str):
# Simple string entry, assume level 1
normalized.append({
'level': 1,
'title': entry.strip(),
'page': 0
})
return normalized
def _generate_toc_from_headings(self, content: str) -> List[Dict[str, Any]]:
"""
Generate TOC from markdown headings.
Extracts all headings (##, ###, etc.) and creates TOC structure
matching the PDF parser format (level, title, page).
Args:
content: Markdown content
Returns:
List of TOC entries
"""
toc_entries = []
# Pattern to match headings: ##, ###, ####, etc. (not # as that's the title)
# Captures: heading level (number of #) and heading text
heading_pattern = re.compile(r'^(#{2,6})\s+(.+)$', re.MULTILINE)
for match in heading_pattern.finditer(content):
hashes = match.group(1)
title = match.group(2).strip()
# Remove markdown links, bold, italic from title
title = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', title) # [text](url) -> text
title = re.sub(r'\*\*([^\*]+)\*\*', r'\1', title) # **bold** -> bold
title = re.sub(r'\*([^\*]+)\*', r'\1', title) # *italic* -> italic
title = re.sub(r'`([^`]+)`', r'\1', title) # `code` -> code
toc_entries.append({
'level': len(hashes), # ## = 2, ### = 3, etc.
'title': title,
'page': 0 # Markdown files don't have page numbers
})
return toc_entries
def count_words(self, text: str) -> int:
"""
Count words in text.
Args:
text: Text to count words in
Returns:
Word count
"""
# Remove markdown syntax for more accurate count
clean_text = re.sub(r'[#*`\[\]()]', ' ', text)
# Remove links
clean_text = re.sub(r'!\[([^\]]*)\]\([^\)]+\)', '', clean_text)
clean_text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', clean_text)
# Remove horizontal rules
clean_text = re.sub(r'^---+$', '', clean_text, flags=re.MULTILINE)
# Split and count
words = clean_text.split()
return len(words)
def count_headings(self, text: str) -> int:
"""
Count level-2 headings (##) in markdown.
Args:
text: Markdown text
Returns:
Number of ## headings
"""
headings = re.findall(r'^##\s+.+$', text, re.MULTILINE)
return len(headings)
def parse_markdown(md_path: Path) -> Dict[str, Any]:
"""
Parse markdown file and extract all metadata and content.
Args:
md_path: Path to markdown file
Returns:
Dictionary containing:
- All metadata fields (title, author, etc.)
- content: Full markdown content (without frontmatter)
- word_count: Number of words
- chapter_count: Number of TOC entries
- table_of_contents: List of TOC entries
Raises:
FileNotFoundError: If markdown file doesn't exist
ValueError: If file cannot be read or parsed
Example:
>>> metadata = parse_markdown(Path('my-book.md'))
>>> print(metadata['title'])
'My Coding Philosophy'
>>> print(f"Chapters: {metadata['chapter_count']}")
Chapters: 5
"""
with MarkdownParser(md_path) as parser:
return parser.extract_metadata()

View File

@@ -0,0 +1,251 @@
"""PDF parsing and metadata extraction."""
import re
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional, Any, List
import fitz # PyMuPDF
import pymupdf4llm
from ..utils.file_utils import parse_filename_metadata
class PDFParser:
"""Parser for extracting metadata and content from PDF files."""
def __init__(self, pdf_path: Path):
"""
Initialize PDF parser.
Args:
pdf_path: Path to PDF file
Raises:
FileNotFoundError: If PDF doesn't exist
ValueError: If file is not a valid PDF
"""
self.pdf_path = Path(pdf_path)
if not self.pdf_path.exists():
raise FileNotFoundError(f"PDF not found: {self.pdf_path}")
try:
self.doc = fitz.open(str(self.pdf_path))
except Exception as e:
raise ValueError(f"Invalid PDF file: {e}")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - close document."""
self.doc.close()
def extract_metadata(self) -> Dict[str, Any]:
"""
Extract all metadata from PDF.
Returns:
Dictionary containing all extracted metadata
"""
metadata = {}
# Extract embedded PDF metadata
embedded = self._extract_embedded_metadata()
metadata.update(embedded)
# Extract table of contents
toc = self._extract_table_of_contents()
metadata['chapter_count'] = len(toc)
metadata['table_of_contents'] = toc
# Page count
metadata['page_count'] = len(self.doc)
# If title or author missing, try filename parsing
if not metadata.get('title') or not metadata.get('author'):
filename_title, filename_author = parse_filename_metadata(self.pdf_path.name)
if not metadata.get('title') and filename_title:
metadata['title'] = filename_title
if not metadata.get('author') and filename_author:
metadata['author'] = filename_author
# If still no title, use filename
if not metadata.get('title'):
metadata['title'] = self.pdf_path.stem
return metadata
def _extract_embedded_metadata(self) -> Dict[str, Any]:
"""
Extract metadata embedded in PDF.
Returns:
Dictionary of embedded metadata
"""
pdf_metadata = self.doc.metadata
metadata = {}
# Title
if title := pdf_metadata.get('title', '').strip():
metadata['title'] = title
# Author
if author := pdf_metadata.get('author', '').strip():
metadata['author'] = author
# Subject
if subject := pdf_metadata.get('subject', '').strip():
metadata['subject'] = subject
# Keywords
if keywords := pdf_metadata.get('keywords', '').strip():
metadata['keywords'] = keywords
# Creator (software that created the PDF)
if creator := pdf_metadata.get('creator', '').strip():
metadata['pdf_creator'] = creator
# Producer (software that produced the PDF)
if producer := pdf_metadata.get('producer', '').strip():
metadata['pdf_producer'] = producer
# Creation date
if creationDate := pdf_metadata.get('creationDate', '').strip():
metadata['pdf_creation_date'] = self._parse_pdf_date(creationDate)
# Modification date
if modDate := pdf_metadata.get('modDate', '').strip():
metadata['pdf_mod_date'] = self._parse_pdf_date(modDate)
return metadata
def _parse_pdf_date(self, date_str: str) -> Optional[datetime]:
"""
Parse PDF date format to datetime.
PDF dates are in format: D:YYYYMMDDHHmmSSOHH'mm
Example: D:20230101120000+00'00
Args:
date_str: PDF date string
Returns:
datetime object or None if parsing fails
"""
if not date_str:
return None
try:
# Remove D: prefix if present
if date_str.startswith('D:'):
date_str = date_str[2:]
# Extract just the date/time part (ignore timezone for simplicity)
date_part = date_str[:14] # YYYYMMDDHHmmSS
# Parse to datetime
return datetime.strptime(date_part, '%Y%m%d%H%M%S')
except (ValueError, IndexError):
return None
def _extract_table_of_contents(self) -> List[Dict[str, Any]]:
"""
Extract table of contents from PDF.
Returns:
List of TOC entries with level, title, and page
"""
toc = self.doc.get_toc()
toc_entries = []
for entry in toc:
level, title, page = entry
toc_entries.append({
'level': level,
'title': title.strip(),
'page': page
})
return toc_entries
def convert_to_markdown(self) -> str:
"""
Convert PDF to markdown using pymupdf4llm with page separators.
Returns:
Markdown content as string with page markers (--- end of page=N ---)
"""
try:
# Use pymupdf4llm for conversion with page separators
md_text = pymupdf4llm.to_markdown(
str(self.pdf_path),
page_separators=True # Add page markers for content extraction
)
return md_text
except Exception as e:
raise ValueError(f"Failed to convert PDF to markdown: {e}")
def count_words(self, text: str) -> int:
"""
Count words in text.
Args:
text: Text to count words in
Returns:
Word count
"""
# Remove markdown syntax for more accurate count
clean_text = re.sub(r'[#*`\[\]()]', ' ', text)
words = clean_text.split()
return len(words)
def extract_first_page_text(self) -> str:
"""
Extract text from first page (for fallback metadata extraction).
Returns:
First page text
"""
if len(self.doc) == 0:
return ""
first_page = self.doc[0]
return first_page.get_text()
def parse_pdf(
pdf_path: Path,
convert_to_md: bool = True
) -> Dict[str, Any]:
"""
Parse PDF and extract all metadata and content.
Args:
pdf_path: Path to PDF file
convert_to_md: Whether to convert to markdown (default: True)
Returns:
Dictionary containing:
- All metadata fields
- markdown_content (if convert_to_md=True)
- word_count (if convert_to_md=True)
Raises:
FileNotFoundError: If PDF doesn't exist
ValueError: If PDF is invalid or conversion fails
"""
with PDFParser(pdf_path) as parser:
# Extract metadata
metadata = parser.extract_metadata()
# Convert to markdown if requested
if convert_to_md:
markdown_content = parser.convert_to_markdown()
metadata['markdown_content'] = markdown_content
metadata['word_count'] = parser.count_words(markdown_content)
return metadata

View File

@@ -0,0 +1,129 @@
"""Configuration management for CandleKeep."""
import os
from pathlib import Path
from typing import Optional, Dict, Any
import yaml
class Config:
"""CandleKeep configuration manager."""
def __init__(self, config_dir: Optional[Path] = None):
"""Initialize configuration manager.
Args:
config_dir: Configuration directory (default: ~/.candlekeep)
"""
self.config_dir = config_dir or Path.home() / ".candlekeep"
self.config_file = self.config_dir / "config.yaml"
self.library_dir = self.config_dir / "library"
self.originals_dir = self.config_dir / "originals"
self._config_data: Optional[Dict[str, Any]] = None
def exists(self) -> bool:
"""Check if configuration file exists.
Returns:
True if config file exists
"""
return self.config_file.exists()
def load(self) -> Dict[str, Any]:
"""Load configuration from file.
Returns:
Configuration dictionary
Raises:
FileNotFoundError: If config file doesn't exist
"""
if not self.exists():
raise FileNotFoundError(
f"Configuration file not found: {self.config_file}\n"
"Run 'candlekeep init' to create configuration."
)
with open(self.config_file, "r") as f:
self._config_data = yaml.safe_load(f)
return self._config_data
def save(self, config_data: Dict[str, Any]):
"""Save configuration to file.
Args:
config_data: Configuration dictionary to save
"""
# Create config directory if it doesn't exist
self.config_dir.mkdir(parents=True, exist_ok=True)
with open(self.config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False, sort_keys=False)
self._config_data = config_data
def get_database_config(self) -> Dict[str, Any]:
"""Get database configuration.
Returns:
Database configuration dictionary
"""
if self._config_data is None:
self.load()
return self._config_data.get("database", {})
def get_connection_string(self) -> str:
"""Get MySQL connection string.
Returns:
SQLAlchemy connection string
"""
db_config = self.get_database_config()
user = db_config.get("user")
password = db_config.get("password")
host = db_config.get("host", "localhost")
port = db_config.get("port", 3306)
database = db_config.get("database", "candlekeep")
return f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}"
def create_directories(self):
"""Create all required directories."""
self.config_dir.mkdir(parents=True, exist_ok=True)
self.library_dir.mkdir(parents=True, exist_ok=True)
self.originals_dir.mkdir(parents=True, exist_ok=True)
@property
def is_initialized(self) -> bool:
"""Check if CandleKeep is initialized.
Returns:
True if directories are set up and database exists
"""
db_path = self.config_dir / "candlekeep.db"
return (
self.config_dir.exists()
and self.library_dir.exists()
and self.originals_dir.exists()
and db_path.exists()
)
# Global configuration instance
_config: Optional[Config] = None
def get_config() -> Config:
"""Get the global configuration instance.
Returns:
Config instance
"""
global _config
if _config is None:
_config = Config()
return _config

View File

@@ -0,0 +1,178 @@
"""Content extraction utilities for markdown files with page markers."""
import re
from typing import Optional, Tuple
def extract_pages_from_markdown(
markdown_text: str,
start_page: int,
end_page: Optional[int] = None
) -> str:
"""
Extract content between specified pages from markdown with page markers.
The markdown must contain page markers in the format:
--- end of page=N ---
Args:
markdown_text: Markdown content with page markers
start_page: Starting page number (1-indexed)
end_page: Ending page number (1-indexed, inclusive). If None, extracts to end.
Returns:
Extracted markdown content for the specified page range
Examples:
# Extract just page 41
content = extract_pages_from_markdown(md, 41, 41)
# Extract pages 41-45
content = extract_pages_from_markdown(md, 41, 45)
# Extract from page 41 to end
content = extract_pages_from_markdown(md, 41)
"""
# Pattern to match page markers: --- end of page=N ---
pattern = r'--- end of page=(\d+) ---'
# Find all page markers and their positions
markers = []
for match in re.finditer(pattern, markdown_text):
page_num = int(match.group(1))
markers.append({
'page': page_num,
'start': match.start(),
'end': match.end()
})
# If no markers found, return empty or full text
if not markers:
return markdown_text if start_page == 1 else ""
# Find start position (content after start_page-1's marker)
start_pos = 0
for marker in markers:
if marker['page'] == start_page - 1:
start_pos = marker['end']
break
# Find end position (before end_page's marker or end of text)
end_pos = len(markdown_text)
if end_page is not None:
for marker in markers:
if marker['page'] == end_page:
end_pos = marker['start']
break
# Extract and clean up the content
content = markdown_text[start_pos:end_pos].strip()
return content
def get_page_range_for_toc_entry(
toc: list,
entry_index: int
) -> Tuple[int, int]:
"""
Get the page range for a TOC entry.
Args:
toc: List of TOC entries (each with 'level', 'title', 'page')
entry_index: Index of the TOC entry to get range for
Returns:
Tuple of (start_page, end_page) for the TOC entry
Examples:
# Get page range for "Goblins" section
toc = [
{'level': 2, 'title': 'Goblinoids', 'page': 41},
{'level': 3, 'title': 'Goblins', 'page': 41},
{'level': 3, 'title': 'Hobgoblins', 'page': 46},
]
start, end = get_page_range_for_toc_entry(toc, 1)
# Returns: (41, 45) - from Goblins to just before Hobgoblins
"""
if entry_index < 0 or entry_index >= len(toc):
raise ValueError(f"Invalid TOC entry index: {entry_index}")
entry = toc[entry_index]
start_page = entry['page']
entry_level = entry['level']
# Find the end page by looking for the next entry at same or higher level
end_page = None
for i in range(entry_index + 1, len(toc)):
next_entry = toc[i]
# If we find an entry at same or higher level (lower number), that's our end
if next_entry['level'] <= entry_level:
end_page = next_entry['page'] - 1
break
# If no next section found, use None (extract to end)
return (start_page, end_page)
def extract_toc_section(
markdown_text: str,
toc: list,
entry_index: int
) -> str:
"""
Extract content for a specific TOC entry.
Convenience function that combines get_page_range_for_toc_entry
and extract_pages_from_markdown.
Args:
markdown_text: Markdown content with page markers
toc: List of TOC entries
entry_index: Index of the TOC entry to extract
Returns:
Markdown content for the TOC section
Examples:
# Extract "Goblins" section
content = extract_toc_section(md_text, toc, 1)
"""
start_page, end_page = get_page_range_for_toc_entry(toc, entry_index)
return extract_pages_from_markdown(markdown_text, start_page, end_page)
def find_toc_entry_by_title(
toc: list,
search_term: str,
case_sensitive: bool = False
) -> Optional[int]:
"""
Find a TOC entry index by searching for a title.
Args:
toc: List of TOC entries
search_term: Text to search for in TOC titles
case_sensitive: Whether search should be case sensitive
Returns:
Index of first matching TOC entry, or None if not found
Examples:
# Find "Goblins" section
index = find_toc_entry_by_title(toc, "Goblins")
if index is not None:
content = extract_toc_section(md_text, toc, index)
"""
if not case_sensitive:
search_term = search_term.lower()
for i, entry in enumerate(toc):
title = entry['title']
if not case_sensitive:
title = title.lower()
if search_term in title:
return i
return None

View File

@@ -0,0 +1,130 @@
"""File handling utilities for CandleKeep."""
import re
from pathlib import Path
from typing import Optional, Tuple
def sanitize_filename(filename: str, max_length: int = 200) -> str:
"""
Sanitize a filename for safe filesystem storage.
Args:
filename: Original filename
max_length: Maximum length for filename (default: 200)
Returns:
Sanitized filename safe for all filesystems
"""
# Remove file extension
name = Path(filename).stem
# Replace problematic characters with hyphens
name = re.sub(r'[<>:"/\\|?*]', '-', name)
# Replace multiple spaces/hyphens with single hyphen
name = re.sub(r'[-\s]+', '-', name)
# Remove leading/trailing hyphens and spaces
name = name.strip('- ')
# Truncate if too long
if len(name) > max_length:
name = name[:max_length].rstrip('- ')
# Ensure not empty
if not name:
name = "untitled"
return name.lower()
def parse_filename_metadata(filename: str) -> Tuple[Optional[str], Optional[str]]:
"""
Extract title and author from filename patterns.
Common patterns:
- "Title - Author.pdf"
- "Title by Author.pdf"
- "Author - Title.pdf"
- "Title (Author).pdf"
Args:
filename: Filename to parse
Returns:
Tuple of (title, author) - either may be None
"""
# Remove extension
name = Path(filename).stem
title = None
author = None
# Pattern 1: "Title - Author" or "Author - Title"
if ' - ' in name:
parts = name.split(' - ', 1)
# Heuristic: if first part has common author patterns, it's author first
if any(indicator in parts[0].lower() for indicator in ['dr.', 'prof.', 'jr.', 'sr.']):
author = parts[0].strip()
title = parts[1].strip()
else:
title = parts[0].strip()
author = parts[1].strip()
# Pattern 2: "Title by Author"
elif ' by ' in name.lower():
parts = re.split(r'\s+by\s+', name, maxsplit=1, flags=re.IGNORECASE)
title = parts[0].strip()
author = parts[1].strip() if len(parts) > 1 else None
# Pattern 3: "Title (Author)"
elif match := re.match(r'^(.+?)\s*\(([^)]+)\)\s*$', name):
title = match.group(1).strip()
author = match.group(2).strip()
# Pattern 4: Just use filename as title
else:
title = name.strip()
return (title if title else None, author if author else None)
def ensure_directory(directory: Path) -> None:
"""
Ensure directory exists, create if it doesn't.
Args:
directory: Path to directory
"""
directory.mkdir(parents=True, exist_ok=True)
def get_unique_filename(directory: Path, base_name: str, extension: str) -> Path:
"""
Generate a unique filename by appending numbers if file exists.
Args:
directory: Target directory
base_name: Base filename without extension
extension: File extension (with or without dot)
Returns:
Path to unique filename
"""
# Ensure extension starts with dot
if not extension.startswith('.'):
extension = f'.{extension}'
filepath = directory / f"{base_name}{extension}"
if not filepath.exists():
return filepath
# File exists, add counter
counter = 1
while True:
filepath = directory / f"{base_name}-{counter}{extension}"
if not filepath.exists():
return filepath
counter += 1

View File

@@ -0,0 +1,50 @@
"""File hashing utilities for duplicate detection."""
import hashlib
from pathlib import Path
from typing import Union
def compute_file_hash(file_path: Union[str, Path]) -> str:
"""
Compute SHA256 hash of a file for duplicate detection.
Args:
file_path: Path to the file
Returns:
SHA256 hash as hexadecimal string
Raises:
FileNotFoundError: If file doesn't exist
IOError: If file cannot be read
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
if not file_path.is_file():
raise ValueError(f"Not a file: {file_path}")
sha256_hash = hashlib.sha256()
# Read file in chunks to handle large files efficiently
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def compute_string_hash(text: str) -> str:
"""
Compute SHA256 hash of a string.
Args:
text: String to hash
Returns:
SHA256 hash as hexadecimal string
"""
return hashlib.sha256(text.encode('utf-8')).hexdigest()