Skip to content

substack fetch

substack fetch #206

name: substack fetch
on:
schedule:
- cron: '0 0 * * *' # Daily at midnight
workflow_dispatch:
jobs:
fetch-substack-posts:
name: Fetch latest blog posts from Substack
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v3
with:
ref: master
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install feedparser requests beautifulsoup4 fake-useragent selenium webdriver-manager cloudscraper
- name: Setup Chrome
uses: browser-actions/setup-chrome@latest
with:
chrome-version: stable
- name: Run script to fetch Substack posts
env:
RSS2JSON_API_KEY: ${{ secrets.RSS2JSON_KEY }}
uses: jannekem/run-python-script-action@v1
with:
script: |
import feedparser
import requests
import random
import os
import time
import json
import cloudscraper
from fake_useragent import UserAgent
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
ua = UserAgent()
def get_free_proxies():
"""Get free proxies from multiple sources"""
proxies = []
# Try free-proxy-list.net
try:
print("Fetching proxies from free-proxy-list...")
scraper = cloudscraper.create_scraper()
response = scraper.get('https://www.proxy-list.download/api/v1/get?type=http', timeout=10)
proxy_list = response.text.strip().split('\n')
for proxy in proxy_list[:20]: # Take first 20
if ':' in proxy:
proxies.append({
'http': f'http://{proxy}',
'https': f'http://{proxy}'
})
print(f"Got {len(proxies)} proxies from proxy-list.download")
except Exception as e:
print(f"Failed to get proxies from proxy-list.download: {e}")
# Backup proxy source
try:
print("Fetching proxies from pubproxy...")
response = requests.get('http://pubproxy.com/api/proxy?limit=20&format=json&type=http', timeout=10)
data = response.json()
for proxy in data.get('data', []):
ip_port = f"{proxy['ip']}:{proxy['port']}"
proxies.append({
'http': f'http://{ip_port}',
'https': f'http://{ip_port}'
})
print(f"Added {len(data.get('data', []))} proxies from pubproxy")
except Exception as e:
print(f"Failed to get proxies from pubproxy: {e}")
return proxies
def get_blog_info_cloudscraper(feed_url, num_entries=20):
"""Use cloudscraper to bypass CloudFlare protection"""
print("Trying cloudscraper...")
try:
scraper = cloudscraper.create_scraper(
browser={
'browser': 'chrome',
'platform': 'windows',
'desktop': True
}
)
# Add some realistic headers
scraper.headers.update({
'Accept': 'application/rss+xml, application/xml, text/xml, */*',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0'
})
response = scraper.get(feed_url, timeout=30)
response.raise_for_status()
feed = feedparser.parse(response.content)
if not feed.entries:
print("No entries found in feed")
return []
entries = []
for entry in feed.entries[:num_entries]:
entries.append({
"title": entry.title,
"link": entry.link
})
print(f"Successfully fetched {len(entries)} entries with cloudscraper")
return entries
except Exception as e:
print(f"Cloudscraper failed: {e}")
return []
def get_blog_info_selenium(feed_url, num_entries=20):
"""Use Selenium to fetch RSS feed"""
print("Trying Selenium...")
driver = None
try:
# Setup Chrome options
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# Create driver
driver = webdriver.Chrome(options=chrome_options)
# Get the RSS feed
driver.get(feed_url)
time.sleep(5) # Wait for page to load
# Get page source
page_source = driver.page_source
# Parse with feedparser
feed = feedparser.parse(page_source)
if not feed.entries:
print("No entries found with Selenium")
return []
entries = []
for entry in feed.entries[:num_entries]:
entries.append({
"title": entry.title,
"link": entry.link
})
print(f"Successfully fetched {len(entries)} entries with Selenium")
return entries
except Exception as e:
print(f"Selenium failed: {e}")
return []
finally:
if driver:
driver.quit()
def get_blog_info_with_proxies(feed_url, num_entries=20):
"""Try with rotating proxies"""
print("Trying with proxies...")
proxies = get_free_proxies()
if not proxies:
print("No proxies available")
return []
# Shuffle proxies for random selection
random.shuffle(proxies)
for i, proxy in enumerate(proxies[:10]): # Try first 10 proxies
try:
print(f"Trying proxy {i+1}/{min(10, len(proxies))}: {proxy['http']}")
scraper = cloudscraper.create_scraper()
scraper.proxies.update(proxy)
scraper.headers.update({
'User-Agent': ua.random,
'Accept': 'application/rss+xml, application/xml, text/xml',
'Accept-Language': 'en-US,en;q=0.9',
})
response = scraper.get(feed_url, timeout=15)
response.raise_for_status()
feed = feedparser.parse(response.content)
if feed.entries:
entries = []
for entry in feed.entries[:num_entries]:
entries.append({
"title": entry.title,
"link": entry.link
})
print(f"Success with proxy! Fetched {len(entries)} entries")
return entries
except Exception as e:
print(f"Proxy {proxy['http']} failed: {e}")
continue
return []
def get_blog_info_rss2json(feed_url, num_entries=20):
"""Use RSS2JSON API as a proxy"""
print("Trying RSS2JSON API...")
try:
api_key = os.environ.get('RSS2JSON_API_KEY')
if not api_key:
print("RSS2JSON API key not found in environment variables")
return []
print(f"Using RSS2JSON API key: {api_key[:8]}...")
# Use params dict for cleaner URL construction
params = {
'rss_url': feed_url,
'api_key': api_key,
'count': num_entries
}
api_url = "https://api.rss2json.com/v1/api.json"
response = requests.get(api_url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
print(f"RSS2JSON response status: {data.get('status')}")
if data.get('status') == 'ok' and data.get('items'):
entries = []
for item in data['items']:
title = item.get('title', '').strip()
link = item.get('link', '').strip()
if title and link:
entries.append({
"title": title,
"link": link
})
print(f"Successfully fetched {len(entries)} entries via RSS2JSON")
print(f"Feed title: {data.get('feed', {}).get('title', 'Unknown')}")
return entries
else:
error_msg = data.get('message', 'Unknown error')
print(f"RSS2JSON API error: {error_msg}")
print(f"Full response: {data}")
return []
except Exception as e:
print(f"RSS2JSON failed: {e}")
return []
def update_markdown_file(filename, blog_info, start_marker, end_marker):
if not os.path.exists(filename):
print(f"File {filename} not found. Creating directory structure...")
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'w', encoding='utf-8') as f:
f.write(f"# Substack Blogs\n\n{start_marker}\n\n{end_marker}\n")
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
start_idx = content.find(start_marker)
end_idx = content.find(end_marker)
if start_idx == -1 or end_idx == -1:
print("Markers not found in file. Adding them...")
content += f"\n\n{start_marker}\n\n{end_marker}\n"
start_idx = content.find(start_marker)
end_idx = content.find(end_marker)
start_idx += len(start_marker)
new_section = "\n".join(f"* [{item['title']}]({item['link']})" for item in blog_info)
updated = content[:start_idx] + "\n" + new_section + "\n" + content[end_idx:]
with open(filename, 'w', encoding='utf-8') as f:
f.write(updated)
print(f"Updated {filename}")
def fetch_with_all_methods(feed_url, num_entries=20):
"""Try all available methods"""
methods = [
get_blog_info_rss2json, # API proxy - often works
get_blog_info_cloudscraper, # CloudFlare bypass
get_blog_info_with_proxies, # Rotating proxies
get_blog_info_selenium # Browser automation
]
for method_num, method in enumerate(methods, 1):
print(f"\n=== Method {method_num}: {method.__name__} ===")
try:
result = method(feed_url, num_entries)
if result:
print(f"✅ Success with method {method_num}!")
return result
else:
print(f"❌ Method {method_num} returned no results")
except Exception as e:
print(f"❌ Method {method_num} failed with error: {e}")
# Small delay between methods
time.sleep(2)
return []
# Main execution
feed_url = "https://datacommons.substack.com/feed"
print(f"Starting enhanced fetch for: {feed_url}")
# Check if we have the API key
api_key = os.environ.get('RSS2JSON_API_KEY')
if api_key:
print(f"RSS2JSON API key detected: {api_key[:8]}...")
else:
print("⚠️ RSS2JSON API key not found - will skip RSS2JSON method")
blog_info = fetch_with_all_methods(feed_url)
filename = "docs/substack_blogs.md"
if blog_info:
print(f"\n✅ Successfully fetched {len(blog_info)} blog posts!")
update_markdown_file(filename, blog_info, "<!-- START_MARKER -->", "<!-- END_MARKER -->")
else:
print("\n❌ All methods failed.")
print("Creating placeholder entry...")
placeholder_info = [{
"title": f"Fetch failed - {time.strftime('%Y-%m-%d %H:%M:%S')}",
"link": feed_url
}]
update_markdown_file(filename, placeholder_info, "<!-- START_MARKER -->", "<!-- END_MARKER -->")
- name: Debug file
run: |
if [ -f docs/substack_blogs.md ]; then
echo "File exists. Content:"
cat docs/substack_blogs.md
else
echo "File does not exist"
ls -la docs/ || echo "docs directory doesn't exist"
fi
- name: Commit changes
run: |
git config --local user.email "action@github.com"
git config --local user.name "GitHub Action"
mkdir -p docs
git add docs/substack_blogs.md
if ! git diff --staged --quiet; then
git commit -m "Update Substack blog links [$(date +%s)]"
echo "Changes committed"
else
echo "<!-- Updated: $(date) -->" >> docs/substack_blogs.md
git add docs/substack_blogs.md
git commit -m "Force update timestamp [$(date +%s)]"
fi
git push origin master